WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 17:48:39 +0000 (18:48 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 17:48:39 +0000 (18:48 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
new file mode 100644 (file)
index 0000000..365bb5e
--- /dev/null
@@ -0,0 +1,62 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class ChatHomeLoader
+{
+  private final long offsetOfFirstNewMessage;
+  private final ZoneId zoneId;
+  private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
+
+
+  /**
+   * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
+   * all messages, that belong to the partition, that defines the shard this
+   * service represents.
+   * The loader signals, that its work is done, if the given end offset is reached.
+   * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
+   * @return {@code true}, if all messages are consumed.
+   */
+  boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
+  {
+    if (record.offset() >= offsetOfFirstNewMessage)
+    {
+      // All messages consumed: DONE!
+      log.debug("I");
+      return true;
+    }
+
+    Instant timestamp = Instant.ofEpochMilli(record.timestamp());
+    LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
+
+    KafkaChatRoomService service = kafkaChatRoomServiceMap
+        .computeIfAbsent(record.key(), key ->
+        {
+        });
+
+    service.addMessage(new Message(
+        Message.MessageKey.of(
+            record.value().getUser(),
+            record.value().getId()),
+        record.offset(),
+        time,
+        record.value().getText()
+        ));
+
+    return false;
+  }
+}
index 7e55473..77fe642 100644 (file)
@@ -32,6 +32,7 @@ class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
   @Override
   public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
   {
+    // TODO
     return null;
   }
 }
index b2ed284..a5d63fd 100644 (file)
@@ -17,7 +17,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
 {
   private final Consumer<String, MessageTo> consumer;
   private final String topic;
-  private final long[] offsets;
+  // private final long[] offsets; Erst mal immer alles neu einlesen
   private final Map<UUID, ChatRoom>[] chatrooms;
 
 
@@ -29,14 +29,11 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     log.debug("Creating KafkaChatHomeService");
     this.consumer = consumer;
     this.topic = topic;
-    this.offsets = new long[numShards];
-    this.chatrooms = new Map<>[numShards];
-    for (int i=0; i< numShards; i++)
-    {
-      this.offsets[i] = 0l;
-      this.chatrooms[i] = chat
-    }
-    this.chatrooms = new Map[numShards];
+    // this.offsets = new long[numShards];
+    // for (int i=0; i< numShards; i++)
+    // {
+    //   this.offsets[i] = 0l;
+    // }
     this.chatrooms = new Map[numShards];
   }
 
@@ -53,13 +50,18 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
       }
 
       int partition = tp.partition();
-      long unseenOffset = offsets[partition];
+      long unseenOffset = 0; // offsets[partition];
 
       log.info(
           "Loading messages from partition {}: start-offset={} -> current-offset={}",
           partition,
           unseenOffset,
           currentOffset);
+
+      consumer.seek(tp, unseenOffset);
+      chatrooms[partition]
+          .values()
+          .stream()
       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
     });
   }