WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 91b5031..37c4e50 100644 (file)
@@ -10,9 +10,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
-import java.util.UUID;
 
 
 @Slf4j
@@ -20,8 +18,6 @@ public class KafkaChatRoomService implements ChatRoomService
 {
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
-  private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
@@ -30,14 +26,10 @@ public class KafkaChatRoomService implements ChatRoomService
 
   public KafkaChatRoomService(
       Producer<String, MessageTo> producer,
-      TopicPartition tp,
-      UUID chatRoomId,
-      ZoneOffset zoneOffset)
+      TopicPartition tp)
   {
     this.producer = producer;
     this.tp = tp;
-    this.chatRoomId = chatRoomId;
-    this.zoneOffset = zoneOffset;
     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
   }
 
@@ -48,9 +40,13 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.persistMessage(key, timestamp, text);
+    return strategy.handleMessage(key, timestamp, text);
   }
 
+  /**
+   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+   */
   protected void addMessage(Message message) throws MessageMutationException
   {
     Message existingMessage = messages.get(message.getKey());