refactor: separated message- and record- (aka offset-) handling
authorKai Moritz <kai@juplo.de>
Sat, 3 Feb 2024 14:20:25 +0000 (15:20 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:28:35 +0000 (10:28 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

index 3429d40..3fe15c4 100644 (file)
@@ -196,7 +196,11 @@ public class InfoChannel implements Runnable
       {
         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
         log.debug("Fetched {} messages", records.count());
-        handleMessages(records);
+        for (ConsumerRecord<String, AbstractMessageTo> record : records)
+        {
+          handleMessage(record);
+          updateNextOffset(record.partition(), record.offset() + 1);
+        }
       }
       catch (WakeupException e)
       {
@@ -208,47 +212,47 @@ public class InfoChannel implements Runnable
     log.info("Exiting normally");
   }
 
-  private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
+  private void updateNextOffset(int partition, long nextOffset)
   {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
-    {
-      switch (record.value().getType())
-      {
-        case EVENT_CHATROOM_CREATED:
-          EventChatRoomCreated eventChatRoomCreated =
-              (EventChatRoomCreated) record.value();
-          createChatRoom(eventChatRoomCreated.toChatRoomInfo());
-          break;
-
-        case EVENT_SHARD_ASSIGNED:
-          EventShardAssigned eventShardAssigned =
-              (EventShardAssigned) record.value();
-          log.info(
-              "Shard {} was assigned to {}",
-              eventShardAssigned.getShard(),
-              eventShardAssigned.getUri());
-          shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
-          break;
-
-        case EVENT_SHARD_REVOKED:
-          EventShardRevoked eventShardRevoked =
-              (EventShardRevoked) record.value();
-          log.info(
-              "Shard {} was revoked from {}",
-              eventShardRevoked.getShard(),
-              eventShardRevoked.getUri());
-          shardOwners[eventShardRevoked.getShard()] = null;
-          break;
-
-        default:
-          log.debug(
-              "Ignoring message for key={} with offset={}: {}",
-              record.key(),
-              record.offset(),
-              record.value());
-      }
+    this.nextOffset[partition] = nextOffset;
+  }
 
-      nextOffset[record.partition()] = record.offset() + 1;
+  private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
+  {
+    switch (record.value().getType())
+    {
+      case EVENT_CHATROOM_CREATED:
+        EventChatRoomCreated eventChatRoomCreated =
+            (EventChatRoomCreated) record.value();
+        createChatRoom(eventChatRoomCreated.toChatRoomInfo());
+        break;
+
+      case EVENT_SHARD_ASSIGNED:
+        EventShardAssigned eventShardAssigned =
+            (EventShardAssigned) record.value();
+        log.info(
+            "Shard {} was assigned to {}",
+            eventShardAssigned.getShard(),
+            eventShardAssigned.getUri());
+        shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+        break;
+
+      case EVENT_SHARD_REVOKED:
+        EventShardRevoked eventShardRevoked =
+            (EventShardRevoked) record.value();
+        log.info(
+            "Shard {} was revoked from {}",
+            eventShardRevoked.getShard(),
+            eventShardRevoked.getUri());
+        shardOwners[eventShardRevoked.getShard()] = null;
+        break;
+
+      default:
+        log.debug(
+            "Ignoring message for key={} with offset={}: {}",
+            record.key(),
+            record.offset(),
+            record.value());
     }
   }