WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatHomeLoader.java
index 365bb5e..465775f 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -18,8 +21,10 @@ import java.util.UUID;
 @Slf4j
 class ChatHomeLoader
 {
-  private final long offsetOfFirstNewMessage;
+  private final Producer<String, MessageTo> producer;
+  private final long offsetOfFirstUnseenMessage;
   private final ZoneId zoneId;
+  @Getter
   private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
 
 
@@ -33,10 +38,20 @@ class ChatHomeLoader
    */
   boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
   {
-    if (record.offset() >= offsetOfFirstNewMessage)
+    TopicPartition topicPartition =
+        new TopicPartition(record.topic(), record.partition());
+    Message.MessageKey messageKey = Message.MessageKey.of(
+        record.value().getUser(),
+        record.value().getId());
+
+    if (record.offset() >= offsetOfFirstUnseenMessage)
     {
       // All messages consumed: DONE!
-      log.debug("I");
+      log.trace(
+          "Ignoring unseen message {} on {}, offset={}",
+          messageKey,
+          topicPartition,
+          record.offset());
       return true;
     }
 
@@ -45,17 +60,13 @@ class ChatHomeLoader
 
     KafkaChatRoomService service = kafkaChatRoomServiceMap
         .computeIfAbsent(record.key(), key ->
-        {
-        });
+            new KafkaChatRoomService(producer, topicPartition));
 
     service.addMessage(new Message(
-        Message.MessageKey.of(
-            record.value().getUser(),
-            record.value().getId()),
+        messageKey,
         record.offset(),
         time,
-        record.value().getText()
-        ));
+        record.value().getText()));
 
     return false;
   }