From: Kai Moritz Date: Wed, 28 Dec 2022 15:58:34 +0000 (+0100) Subject: refactor: Refined packaging (moved classes to new folders and files) X-Git-Tag: wip~97 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7414187b1d200ee3f837a509d22e55dda3a00d32;p=demos%2Fkafka%2Fchat refactor: Refined packaging (moved classes to new folders and files) --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java new file mode 100644 index 00000000..287c9ee0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chatroom; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import java.time.Clock; + + +@SpringBootApplication +public class ChatroomApplication +{ + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + + + public static void main(String[] args) + { + SpringApplication.run(ChatroomApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java new file mode 100644 index 00000000..d6130927 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -0,0 +1,89 @@ +package de.juplo.kafka.chatroom.api; + +import de.juplo.kafka.chatroom.domain.Chatroom; +import lombok.RequiredArgsConstructor; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + + +@RestController +@RequiredArgsConstructor +public class ChatroomController +{ + private final Map chatrooms = new HashMap<>(); + private final Clock clock; + + + @PostMapping("create") + public Chatroom create(@RequestBody String name) + { + Chatroom chatroom = new Chatroom(UUID.randomUUID(), name); + chatrooms.put(chatroom.getId(), chatroom); + return chatroom; + } + + @GetMapping("list") + public Collection list() + { + return chatrooms.values(); + } + + @GetMapping("get/{chatroomId}") + public Chatroom get(@PathVariable UUID chatroomId) + { + return chatrooms.get(chatroomId); + } + + @PutMapping("put/{chatroomId}/{username}/{messageId}") + public Mono put( + @PathVariable UUID chatroomId, + @PathVariable String username, + @PathVariable Long messageId, + @RequestBody String text) + { + Chatroom chatroom = chatrooms.get(chatroomId); + return + chatroom + .addMessage( + messageId, + LocalDateTime.now(clock), + username, + text) + .switchIfEmpty(chatroom.getMessage(username, messageId)) + .map(message -> MessageTo.from(message)); + } + + @GetMapping("get/{chatroomId}/{username}/{messageId}") + public Mono get( + @PathVariable UUID chatroomId, + @PathVariable String username, + @PathVariable Long messageId) + { + return + chatrooms + .get(chatroomId) + .getMessage(username, messageId) + .map(message -> MessageTo.from(message)); + } + + @GetMapping( + path = "listen/{chatroomId}", + produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux listen(@PathVariable UUID chatroomId) + { + return chatrooms + .get(chatroomId) + .listen() + .log() + .map(message -> MessageTo.from(message)); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java new file mode 100644 index 00000000..42f7599e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java @@ -0,0 +1,55 @@ +package de.juplo.kafka.chatroom.api; + +import de.juplo.kafka.chatroom.domain.MessageMutationException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.ProblemDetail; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.util.UriComponentsBuilder; + +import java.util.Date; + + +@ControllerAdvice +public class ChatroomControllerAdvice +{ + @Value("${server.context-path:/}") + String contextPath; + + @ExceptionHandler(MessageMutationException.class) + public final ProblemDetail handleException( + MessageMutationException e, + ServerWebExchange exchange, + UriComponentsBuilder uriComponentsBuilder) + { + final HttpStatus status = HttpStatus.BAD_REQUEST; + ProblemDetail problem = ProblemDetail.forStatus(status); + + problem.setProperty("timestamp", new Date()); + + problem.setProperty("requestId", exchange.getRequest().getId()); + + problem.setType(uriComponentsBuilder.replacePath(contextPath).path("/problem/message-mutation").build().toUri()); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(status.getReasonPhrase()); + stringBuilder.append(" - "); + stringBuilder.append(e.getMessage()); + problem.setTitle(stringBuilder.toString()); + + stringBuilder.setLength(0); + stringBuilder.append("The existing message with user="); + stringBuilder.append(e.getExisting().getUser()); + stringBuilder.append(" and id="); + stringBuilder.append(e.getExisting().getId()); + stringBuilder.append(" cannot be mutated!"); + problem.setDetail(stringBuilder.toString()); + + problem.setProperty("mutatedMessage", e.getMutated()); + + problem.setProperty("existingMessage", e.getExisting()); + + return problem; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java new file mode 100644 index 00000000..c7713865 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java @@ -0,0 +1,30 @@ +package de.juplo.kafka.chatroom.api; + +import de.juplo.kafka.chatroom.domain.Message; +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.time.LocalDateTime; + + +@Data +@AllArgsConstructor +public class MessageTo +{ + private Long id; + private Long serialNumber; + private LocalDateTime timestamp; + private String user; + private String text; + + public static MessageTo from(Message message) + { + return + new MessageTo( + message.getId(), + message.getSerialNumber(), + message.getTimestamp(), + message.getUser(), + message.getText()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java new file mode 100644 index 00000000..62d33f27 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java @@ -0,0 +1,90 @@ +package de.juplo.kafka.chatroom.domain; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Stream; + + +@RequiredArgsConstructor +@Slf4j +public class Chatroom +{ + @Getter + private final UUID id; + @Getter + private final String name; + private final LinkedHashMap messages = new LinkedHashMap<>(); + private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + + synchronized public Mono addMessage( + Long id, + LocalDateTime timestamp, + String user, + String text) + { + return persistMessage(id, timestamp, user, text) + .doOnNext(message -> sink.tryEmitNext(message).orThrow()); + } + + private Mono persistMessage( + Long id, + LocalDateTime timestamp, + String user, + String text) + { + Message message = new Message(id, (long)messages.size(), timestamp, user, text); + + MessageKey key = new MessageKey(user, id); + Message existing = messages.get(key); + if (existing != null) + { + log.info("Message with key {} already exists; {}", key, existing); + if (!message.equals(existing)) + throw new MessageMutationException(message, existing); + return Mono.empty(); + } + + messages.put(key, message); + return Mono + .fromSupplier(() -> message) + .log(); + } + + public Mono getMessage(String username, Long messageId) + { + return Mono.fromSupplier(() -> + { + MessageKey key = MessageKey.of(username, messageId); + return messages.get(key); + }); + } + + public Flux listen() + { + return sink.asFlux(); + } + + public Stream getMessages(long firstMessage) + { + return messages + .values() + .stream() + .filter(message -> message.getSerialNumber() >= firstMessage); + } + + + @Value(staticConstructor = "of") + static class MessageKey + { + String username; + Long messageId; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Message.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Message.java new file mode 100644 index 00000000..84cddede --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Message.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.chatroom.domain; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.time.LocalDateTime; + + +@RequiredArgsConstructor +@Getter +@EqualsAndHashCode +@ToString +public class Message +{ + private final Long id; + private final Long serialNumber; + private final LocalDateTime timestamp; + private final String user; + private final String text; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/MessageMutationException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/MessageMutationException.java new file mode 100644 index 00000000..db4ebc24 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/MessageMutationException.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chatroom.domain; + +import lombok.Getter; + + +public class MessageMutationException extends RuntimeException +{ + @Getter + private final Message mutated; + @Getter + private final Message existing; + + public MessageMutationException(Message mutated, Message existing) + { + super("Messages are imutable!"); + this.mutated = mutated; + this.existing = existing; + } +} diff --git a/src/main/java/de/juplo/kafka/chatroom/ChatroomApplication.java b/src/main/java/de/juplo/kafka/chatroom/ChatroomApplication.java deleted file mode 100644 index 287c9ee0..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/ChatroomApplication.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chatroom; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; - -import java.time.Clock; - - -@SpringBootApplication -public class ChatroomApplication -{ - @Bean - public Clock clock() - { - return Clock.systemDefaultZone(); - } - - - public static void main(String[] args) - { - SpringApplication.run(ChatroomApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java deleted file mode 100644 index d6130927..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka.chatroom.api; - -import de.juplo.kafka.chatroom.domain.Chatroom; -import lombok.RequiredArgsConstructor; -import org.springframework.http.MediaType; -import org.springframework.web.bind.annotation.*; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.time.LocalDateTime; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - - -@RestController -@RequiredArgsConstructor -public class ChatroomController -{ - private final Map chatrooms = new HashMap<>(); - private final Clock clock; - - - @PostMapping("create") - public Chatroom create(@RequestBody String name) - { - Chatroom chatroom = new Chatroom(UUID.randomUUID(), name); - chatrooms.put(chatroom.getId(), chatroom); - return chatroom; - } - - @GetMapping("list") - public Collection list() - { - return chatrooms.values(); - } - - @GetMapping("get/{chatroomId}") - public Chatroom get(@PathVariable UUID chatroomId) - { - return chatrooms.get(chatroomId); - } - - @PutMapping("put/{chatroomId}/{username}/{messageId}") - public Mono put( - @PathVariable UUID chatroomId, - @PathVariable String username, - @PathVariable Long messageId, - @RequestBody String text) - { - Chatroom chatroom = chatrooms.get(chatroomId); - return - chatroom - .addMessage( - messageId, - LocalDateTime.now(clock), - username, - text) - .switchIfEmpty(chatroom.getMessage(username, messageId)) - .map(message -> MessageTo.from(message)); - } - - @GetMapping("get/{chatroomId}/{username}/{messageId}") - public Mono get( - @PathVariable UUID chatroomId, - @PathVariable String username, - @PathVariable Long messageId) - { - return - chatrooms - .get(chatroomId) - .getMessage(username, messageId) - .map(message -> MessageTo.from(message)); - } - - @GetMapping( - path = "listen/{chatroomId}", - produces = MediaType.TEXT_EVENT_STREAM_VALUE) - public Flux listen(@PathVariable UUID chatroomId) - { - return chatrooms - .get(chatroomId) - .listen() - .log() - .map(message -> MessageTo.from(message)); - } -} diff --git a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomControllerAdvice.java b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomControllerAdvice.java deleted file mode 100644 index 42f7599e..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomControllerAdvice.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka.chatroom.api; - -import de.juplo.kafka.chatroom.domain.MessageMutationException; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpStatus; -import org.springframework.http.ProblemDetail; -import org.springframework.web.bind.annotation.ControllerAdvice; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.server.ServerWebExchange; -import org.springframework.web.util.UriComponentsBuilder; - -import java.util.Date; - - -@ControllerAdvice -public class ChatroomControllerAdvice -{ - @Value("${server.context-path:/}") - String contextPath; - - @ExceptionHandler(MessageMutationException.class) - public final ProblemDetail handleException( - MessageMutationException e, - ServerWebExchange exchange, - UriComponentsBuilder uriComponentsBuilder) - { - final HttpStatus status = HttpStatus.BAD_REQUEST; - ProblemDetail problem = ProblemDetail.forStatus(status); - - problem.setProperty("timestamp", new Date()); - - problem.setProperty("requestId", exchange.getRequest().getId()); - - problem.setType(uriComponentsBuilder.replacePath(contextPath).path("/problem/message-mutation").build().toUri()); - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(status.getReasonPhrase()); - stringBuilder.append(" - "); - stringBuilder.append(e.getMessage()); - problem.setTitle(stringBuilder.toString()); - - stringBuilder.setLength(0); - stringBuilder.append("The existing message with user="); - stringBuilder.append(e.getExisting().getUser()); - stringBuilder.append(" and id="); - stringBuilder.append(e.getExisting().getId()); - stringBuilder.append(" cannot be mutated!"); - problem.setDetail(stringBuilder.toString()); - - problem.setProperty("mutatedMessage", e.getMutated()); - - problem.setProperty("existingMessage", e.getExisting()); - - return problem; - } -} diff --git a/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java b/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java deleted file mode 100644 index c7713865..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.juplo.kafka.chatroom.api; - -import de.juplo.kafka.chatroom.domain.Message; -import lombok.AllArgsConstructor; -import lombok.Data; - -import java.time.LocalDateTime; - - -@Data -@AllArgsConstructor -public class MessageTo -{ - private Long id; - private Long serialNumber; - private LocalDateTime timestamp; - private String user; - private String text; - - public static MessageTo from(Message message) - { - return - new MessageTo( - message.getId(), - message.getSerialNumber(), - message.getTimestamp(), - message.getUser(), - message.getText()); - } -} diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java deleted file mode 100644 index 62d33f27..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java +++ /dev/null @@ -1,90 +0,0 @@ -package de.juplo.kafka.chatroom.domain; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.Value; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; - -import java.time.LocalDateTime; -import java.util.*; -import java.util.stream.Stream; - - -@RequiredArgsConstructor -@Slf4j -public class Chatroom -{ - @Getter - private final UUID id; - @Getter - private final String name; - private final LinkedHashMap messages = new LinkedHashMap<>(); - private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); - - synchronized public Mono addMessage( - Long id, - LocalDateTime timestamp, - String user, - String text) - { - return persistMessage(id, timestamp, user, text) - .doOnNext(message -> sink.tryEmitNext(message).orThrow()); - } - - private Mono persistMessage( - Long id, - LocalDateTime timestamp, - String user, - String text) - { - Message message = new Message(id, (long)messages.size(), timestamp, user, text); - - MessageKey key = new MessageKey(user, id); - Message existing = messages.get(key); - if (existing != null) - { - log.info("Message with key {} already exists; {}", key, existing); - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return Mono.empty(); - } - - messages.put(key, message); - return Mono - .fromSupplier(() -> message) - .log(); - } - - public Mono getMessage(String username, Long messageId) - { - return Mono.fromSupplier(() -> - { - MessageKey key = MessageKey.of(username, messageId); - return messages.get(key); - }); - } - - public Flux listen() - { - return sink.asFlux(); - } - - public Stream getMessages(long firstMessage) - { - return messages - .values() - .stream() - .filter(message -> message.getSerialNumber() >= firstMessage); - } - - - @Value(staticConstructor = "of") - static class MessageKey - { - String username; - Long messageId; - } -} diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Message.java b/src/main/java/de/juplo/kafka/chatroom/domain/Message.java deleted file mode 100644 index 84cddede..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Message.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.chatroom.domain; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; - -import java.time.LocalDateTime; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode -@ToString -public class Message -{ - private final Long id; - private final Long serialNumber; - private final LocalDateTime timestamp; - private final String user; - private final String text; -} diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/MessageMutationException.java b/src/main/java/de/juplo/kafka/chatroom/domain/MessageMutationException.java deleted file mode 100644 index db4ebc24..00000000 --- a/src/main/java/de/juplo/kafka/chatroom/domain/MessageMutationException.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.chatroom.domain; - -import lombok.Getter; - - -public class MessageMutationException extends RuntimeException -{ - @Getter - private final Message mutated; - @Getter - private final Message existing; - - public MessageMutationException(Message mutated, Message existing) - { - super("Messages are imutable!"); - this.mutated = mutated; - this.existing = existing; - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/ChatBackendApplicationTests.java b/src/test/java/de/juplo/kafka/chat/backend/ChatBackendApplicationTests.java new file mode 100644 index 00000000..9b23ad0f --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/ChatBackendApplicationTests.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.chatroom; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ChatroomApplicationTests +{ + @Test + void contextLoads() + { + } +} diff --git a/src/test/java/de/juplo/kafka/chatroom/ChatroomApplicationTests.java b/src/test/java/de/juplo/kafka/chatroom/ChatroomApplicationTests.java deleted file mode 100644 index 9b23ad0f..00000000 --- a/src/test/java/de/juplo/kafka/chatroom/ChatroomApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.chatroom; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class ChatroomApplicationTests -{ - @Test - void contextLoads() - { - } -}