feat: An `IllegalArgumentException` is thrown, if different messages are added for...
authorKai Moritz <kai@juplo.de>
Mon, 26 Dec 2022 13:03:52 +0000 (14:03 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Jan 2023 18:32:58 +0000 (19:32 +0100)
src/main/java/de/juplo/kafka/chatroom/api/ChatroomController.java
src/main/java/de/juplo/kafka/chatroom/api/MessageTo.java
src/main/java/de/juplo/kafka/chatroom/domain/Chatroom.java
src/main/java/de/juplo/kafka/chatroom/domain/Message.java

index da65a43..d613092 100644 (file)
@@ -8,9 +8,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.Clock;
-import java.time.Duration;
 import java.time.LocalDateTime;
-import java.time.LocalTime;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -45,21 +43,35 @@ public class ChatroomController
     return chatrooms.get(chatroomId);
   }
 
-  @PutMapping("post/{chatroomId}/{username}/{messageId}")
-  public Mono<MessageTo> post(
+  @PutMapping("put/{chatroomId}/{username}/{messageId}")
+  public Mono<MessageTo> put(
       @PathVariable UUID chatroomId,
       @PathVariable String username,
-      @PathVariable UUID messageId,
+      @PathVariable Long messageId,
       @RequestBody String text)
   {
+    Chatroom chatroom = chatrooms.get(chatroomId);
     return
-        chatrooms
-            .get(chatroomId)
+        chatroom
             .addMessage(
                 messageId,
                 LocalDateTime.now(clock),
                 username,
                 text)
+            .switchIfEmpty(chatroom.getMessage(username, messageId))
+            .map(message -> MessageTo.from(message));
+  }
+
+  @GetMapping("get/{chatroomId}/{username}/{messageId}")
+  public Mono<MessageTo> get(
+      @PathVariable UUID chatroomId,
+      @PathVariable String username,
+      @PathVariable Long messageId)
+  {
+    return
+        chatrooms
+            .get(chatroomId)
+            .getMessage(username, messageId)
             .map(message -> MessageTo.from(message));
   }
 
index 54ff7f7..c771386 100644 (file)
@@ -5,14 +5,13 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 
 import java.time.LocalDateTime;
-import java.util.UUID;
 
 
 @Data
 @AllArgsConstructor
 public class MessageTo
 {
-  private UUID id;
+  private Long id;
   private Long serialNumber;
   private LocalDateTime timestamp;
   private String user;
index f87a088..1d7ee2a 100644 (file)
@@ -2,15 +2,14 @@ package de.juplo.kafka.chatroom.domain;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
 
 import java.time.LocalDateTime;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Stream;
 
 
@@ -22,11 +21,11 @@ public class Chatroom
   private final UUID id;
   @Getter
   private final String name;
-  private final List<Message> messages = new LinkedList<>();
+  private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
   private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
 
   synchronized public Mono<Message> addMessage(
-      UUID id,
+      Long id,
       LocalDateTime timestamp,
       String user,
       String text)
@@ -36,18 +35,38 @@ public class Chatroom
   }
 
   private Mono<Message> persistMessage(
-      UUID id,
+      Long id,
       LocalDateTime timestamp,
       String user,
       String text)
   {
     Message message = new Message(id, (long)messages.size(), timestamp, user, text);
-    messages.add(message);
+
+    MessageKey key = new MessageKey(user, id);
+    Message existing = messages.get(key);
+    if (existing != null)
+    {
+      log.info("Message with key {} already exists; {}", key, existing);
+      if (!message.equals(existing))
+        throw new IllegalArgumentException("Messages are imutable!");
+      return Mono.empty();
+    }
+
+    messages.put(key, message);
     return Mono
         .fromSupplier(() -> message)
         .log();
   }
 
+  public Mono<Message> getMessage(String username, Long messageId)
+  {
+    return Mono.fromSupplier(() ->
+    {
+      MessageKey key = MessageKey.of(username, messageId);
+      return messages.get(key);
+    });
+  }
+
   public Flux<Message> listen()
   {
     return sink.asFlux();
@@ -55,6 +74,17 @@ public class Chatroom
 
   public Stream<Message> getMessages(long firstMessage)
   {
-    return messages.stream().filter(message -> message.getSerialNumber() >= firstMessage);
+    return messages
+        .values()
+        .stream()
+        .filter(message -> message.getSerialNumber() >= firstMessage);
+  }
+
+
+  @Value(staticConstructor = "of")
+  static class MessageKey
+  {
+    String username;
+    Long messageId;
   }
 }
index d93dc1b..84cdded 100644 (file)
@@ -6,7 +6,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
 import java.time.LocalDateTime;
-import java.util.UUID;
 
 
 @RequiredArgsConstructor
@@ -15,7 +14,7 @@ import java.util.UUID;
 @ToString
 public class Message
 {
-  private final UUID id;
+  private final Long id;
   private final Long serialNumber;
   private final LocalDateTime timestamp;
   private final String user;