From 91782ddca4ed528acc89c6d636367041ed49872c Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Wed, 13 Sep 2023 23:15:13 +0200
Subject: [PATCH] WIP

---
 .../implementation/kafka/DataChannel.java     |  4 ++--
 .../implementation/kafka/InfoChannel.java     | 24 +++++++++++--------
 .../kafka/KafkaChatHomeServiceTest.java       | 19 +++++++++------
 3 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
index a7899e43..f20f7c23 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
@@ -175,7 +175,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
         if (loadInProgress)
         {
-          loadChatRoom(records);
+          loadChatRoomData(records);
 
           if (isLoadingCompleted())
           {
@@ -203,7 +203,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoomData(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
index 5a1d186d..a98e46ea 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
@@ -28,7 +28,7 @@ public class InfoChannel implements Runnable
   private final Consumer<String, AbstractMessageTo> consumer;
   private final int numShards;
   private final long[] startOffset;
-  private final long[] currentOffset;
+  private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final DataChannel dataChannel;
 
@@ -53,15 +53,10 @@ public class InfoChannel implements Runnable
         .partitionsFor(topic)
         .size();
     this.startOffset = new long[numShards];
-    this.currentOffset = new long[numShards];
+    this.nextOffset = new long[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(partition -> this.currentOffset[partition] = -1l);
-    consumer
-        .endOffsets(consumer.assignment())
-        .entrySet()
-        .stream()
-        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+        .forEach(partition -> this.nextOffset[partition] = -1l);
 
     this.dataChannel = dataChannel;
   }
@@ -71,7 +66,7 @@ public class InfoChannel implements Runnable
   {
     return IntStream
         .range(0, numShards)
-        .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
+        .anyMatch(partition -> nextOffset[partition] < startOffset[partition]);
   }
 
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
@@ -117,6 +112,15 @@ public class InfoChannel implements Runnable
   {
     running = true;
 
+    consumer
+        .endOffsets(consumer.assignment())
+        .entrySet()
+        .stream()
+        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+    IntStream
+        .range(0, numShards)
+        .forEach(partition -> this.nextOffset[partition] = 0l);
+
     while (running)
     {
       try
@@ -155,7 +159,7 @@ public class InfoChannel implements Runnable
               record.value());
       }
 
-      startOffset[record.partition()] = record.offset() + 1;
+      nextOffset[record.partition()] = record.offset() + 1;
     }
   }
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
index 6bf3a742..cf097f84 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
@@ -80,16 +80,21 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   @BeforeAll
   public static void sendAndLoadStoredData(@Autowired KafkaTemplate<String, String> messageTemplate)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
   }
 
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  static void send(
+      KafkaTemplate<String, String> kafkaTemplate,
+      String topic,
+      String key,
+      String value,
+      String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(DATA_TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
-- 
2.20.1