WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 21:15:13 +0000 (23:15 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 21:15:13 +0000 (23:15 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java

index a7899e4..f20f7c2 100644 (file)
@@ -175,7 +175,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
         if (loadInProgress)
         {
-          loadChatRoom(records);
+          loadChatRoomData(records);
 
           if (isLoadingCompleted())
           {
@@ -203,7 +203,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoomData(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
index 5a1d186..a98e46e 100644 (file)
@@ -28,7 +28,7 @@ public class InfoChannel implements Runnable
   private final Consumer<String, AbstractMessageTo> consumer;
   private final int numShards;
   private final long[] startOffset;
-  private final long[] currentOffset;
+  private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
   private final DataChannel dataChannel;
 
@@ -53,15 +53,10 @@ public class InfoChannel implements Runnable
         .partitionsFor(topic)
         .size();
     this.startOffset = new long[numShards];
-    this.currentOffset = new long[numShards];
+    this.nextOffset = new long[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(partition -> this.currentOffset[partition] = -1l);
-    consumer
-        .endOffsets(consumer.assignment())
-        .entrySet()
-        .stream()
-        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+        .forEach(partition -> this.nextOffset[partition] = -1l);
 
     this.dataChannel = dataChannel;
   }
@@ -71,7 +66,7 @@ public class InfoChannel implements Runnable
   {
     return IntStream
         .range(0, numShards)
-        .anyMatch(partition -> currentOffset[partition] < startOffset[partition]);
+        .anyMatch(partition -> nextOffset[partition] < startOffset[partition]);
   }
 
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
@@ -117,6 +112,15 @@ public class InfoChannel implements Runnable
   {
     running = true;
 
+    consumer
+        .endOffsets(consumer.assignment())
+        .entrySet()
+        .stream()
+        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+    IntStream
+        .range(0, numShards)
+        .forEach(partition -> this.nextOffset[partition] = 0l);
+
     while (running)
     {
       try
@@ -155,7 +159,7 @@ public class InfoChannel implements Runnable
               record.value());
       }
 
-      startOffset[record.partition()] = record.offset() + 1;
+      nextOffset[record.partition()] = record.offset() + 1;
     }
   }
 
index 6bf3a74..cf097f8 100644 (file)
@@ -80,16 +80,21 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   @BeforeAll
   public static void sendAndLoadStoredData(@Autowired KafkaTemplate<String, String> messageTemplate)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "event_chatroom_created");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
   }
 
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  static void send(
+      KafkaTemplate<String, String> kafkaTemplate,
+      String topic,
+      String key,
+      String value,
+      String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(DATA_TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(