WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:13:25 +0000 (19:13 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 18:13:25 +0000 (19:13 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 15d968a..465775f 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -18,8 +21,10 @@ import java.util.UUID;
 @Slf4j
 class ChatHomeLoader
 {
+  private final Producer<String, MessageTo> producer;
   private final long offsetOfFirstUnseenMessage;
   private final ZoneId zoneId;
+  @Getter
   private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
 
 
@@ -33,6 +38,8 @@ class ChatHomeLoader
    */
   boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
   {
+    TopicPartition topicPartition =
+        new TopicPartition(record.topic(), record.partition());
     Message.MessageKey messageKey = Message.MessageKey.of(
         record.value().getUser(),
         record.value().getId());
@@ -41,10 +48,9 @@ class ChatHomeLoader
     {
       // All messages consumed: DONE!
       log.trace(
-          "Ignoring unseen message {}: topic={}, partition={}, offset={}",
+          "Ignoring unseen message {} on {}, offset={}",
           messageKey,
-          record.topic(),
-          record.partition(),
+          topicPartition,
           record.offset());
       return true;
     }
@@ -54,15 +60,13 @@ class ChatHomeLoader
 
     KafkaChatRoomService service = kafkaChatRoomServiceMap
         .computeIfAbsent(record.key(), key ->
-        {
-        });
+            new KafkaChatRoomService(producer, topicPartition));
 
     service.addMessage(new Message(
         messageKey,
         record.offset(),
         time,
-        record.value().getText()
-        ));
+        record.value().getText()));
 
     return false;
   }
index c4737a1..37c4e50 100644 (file)
@@ -10,9 +10,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
-import java.util.UUID;
 
 
 @Slf4j
@@ -20,8 +18,6 @@ public class KafkaChatRoomService implements ChatRoomService
 {
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
-  private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
@@ -30,14 +26,10 @@ public class KafkaChatRoomService implements ChatRoomService
 
   public KafkaChatRoomService(
       Producer<String, MessageTo> producer,
-      TopicPartition tp,
-      UUID chatRoomId,
-      ZoneOffset zoneOffset)
+      TopicPartition tp)
   {
     this.producer = producer;
     this.tp = tp;
-    this.chatRoomId = chatRoomId;
-    this.zoneOffset = zoneOffset;
     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
   }