From: Kai Moritz Date: Mon, 26 Dec 2022 13:03:52 +0000 (+0100) Subject: feat: An `IllegalArgumentException` is thrown, if different messages are added for... X-Git-Tag: wip~101 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e2e27bebf17de850b8f8550aea61794e239daccd;p=demos%2Fkafka%2Fchat feat: An `IllegalArgumentException` is thrown, if different messages are added for the same `MessageKey` --- diff --git a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java index da65a439..d6130927 100644 --- a/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java +++ b/src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java @@ -8,9 +8,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Clock; -import java.time.Duration; import java.time.LocalDateTime; -import java.time.LocalTime; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -45,21 +43,35 @@ public class ChatroomController return chatrooms.get(chatroomId); } - @PutMapping("post/{chatroomId}/{username}/{messageId}") - public Mono post( + @PutMapping("put/{chatroomId}/{username}/{messageId}") + public Mono put( @PathVariable UUID chatroomId, @PathVariable String username, - @PathVariable UUID messageId, + @PathVariable Long messageId, @RequestBody String text) { + Chatroom chatroom = chatrooms.get(chatroomId); return - chatrooms - .get(chatroomId) + chatroom .addMessage( messageId, LocalDateTime.now(clock), username, text) + .switchIfEmpty(chatroom.getMessage(username, messageId)) + .map(message -> MessageTo.from(message)); + } + + @GetMapping("get/{chatroomId}/{username}/{messageId}") + public Mono get( + @PathVariable UUID chatroomId, + @PathVariable String username, + @PathVariable Long messageId) + { + return + chatrooms + .get(chatroomId) + .getMessage(username, messageId) .map(message -> MessageTo.from(message)); } diff --git a/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java b/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java index 54ff7f70..c7713865 100644 --- a/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java +++ b/src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java @@ -5,14 +5,13 @@ import lombok.AllArgsConstructor; import lombok.Data; import java.time.LocalDateTime; -import java.util.UUID; @Data @AllArgsConstructor public class MessageTo { - private UUID id; + private Long id; private Long serialNumber; private LocalDateTime timestamp; private String user; diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java index f87a0883..1d7ee2af 100644 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java @@ -2,15 +2,14 @@ package de.juplo.kafka.chatroom.domain; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.Value; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import java.time.LocalDateTime; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.stream.Stream; @@ -22,11 +21,11 @@ public class Chatroom private final UUID id; @Getter private final String name; - private final List messages = new LinkedList<>(); + private final LinkedHashMap messages = new LinkedHashMap<>(); private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); synchronized public Mono addMessage( - UUID id, + Long id, LocalDateTime timestamp, String user, String text) @@ -36,18 +35,38 @@ public class Chatroom } private Mono persistMessage( - UUID id, + Long id, LocalDateTime timestamp, String user, String text) { Message message = new Message(id, (long)messages.size(), timestamp, user, text); - messages.add(message); + + MessageKey key = new MessageKey(user, id); + Message existing = messages.get(key); + if (existing != null) + { + log.info("Message with key {} already exists; {}", key, existing); + if (!message.equals(existing)) + throw new IllegalArgumentException("Messages are imutable!"); + return Mono.empty(); + } + + messages.put(key, message); return Mono .fromSupplier(() -> message) .log(); } + public Mono getMessage(String username, Long messageId) + { + return Mono.fromSupplier(() -> + { + MessageKey key = MessageKey.of(username, messageId); + return messages.get(key); + }); + } + public Flux listen() { return sink.asFlux(); @@ -55,6 +74,17 @@ public class Chatroom public Stream getMessages(long firstMessage) { - return messages.stream().filter(message -> message.getSerialNumber() >= firstMessage); + return messages + .values() + .stream() + .filter(message -> message.getSerialNumber() >= firstMessage); + } + + + @Value(staticConstructor = "of") + static class MessageKey + { + String username; + Long messageId; } } diff --git a/src/main/java/de/juplo/kafka/chatroom/domain/Message.java b/src/main/java/de/juplo/kafka/chatroom/domain/Message.java index d93dc1be..84cddede 100644 --- a/src/main/java/de/juplo/kafka/chatroom/domain/Message.java +++ b/src/main/java/de/juplo/kafka/chatroom/domain/Message.java @@ -6,7 +6,6 @@ import lombok.RequiredArgsConstructor; import lombok.ToString; import java.time.LocalDateTime; -import java.util.UUID; @RequiredArgsConstructor @@ -15,7 +14,7 @@ import java.util.UUID; @ToString public class Message { - private final UUID id; + private final Long id; private final Long serialNumber; private final LocalDateTime timestamp; private final String user;