WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 11 Sep 2023 21:29:16 +0000 (23:29 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:35:20 +0000 (23:35 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 9c3fd63..4c9abd1 100644 (file)
@@ -139,6 +139,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
     });
+
+    consumer.resume(partitions);
   }
 
   @Override
@@ -174,17 +176,22 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
         if (loadInProgress)
         {
-          loadChatRooms(records);
+          loadChatRoom(records);
 
           if (isLoadingCompleted())
           {
             log.info("Loading of messages completed! Pausing all owned partitions...");
+            pauseAllOwnedPartions();
+            log.info("Resuming normal operations...");
             loadInProgress = false;
           }
         }
         else
         {
-          createChatRooms(records);
+          if (!records.isEmpty())
+          {
+            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+          }
         }
       }
       catch (WakeupException e)
@@ -197,7 +204,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRooms(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
@@ -235,35 +242,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-
-  private void createChatRooms(ConsumerRecords<String, AbstractMessageTo> records)
-  {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
-    {
-      UUID chatRoomId = UUID.fromString(record.key());
-
-      switch (record.value().getType())
-      {
-        case COMMAND_CREATE_CHATROOM:
-          log.info("Received create-request for chat room: {}", chatRoomId);
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
-        default:
-          log.debug(
-              "Ignoring message for chat-room {} with offset {}: {}",
-              chatRoomId,
-              record.offset(),
-              record.value());
-      }
-
-      nextOffset[record.partition()] = record.offset() + 1;
-    }
-  }
-
   private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
@@ -325,6 +303,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
   }
 
+  private void pauseAllOwnedPartions()
+  {
+    consumer.pause(IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .mapToObj(shard -> new TopicPartition(topic, shard))
+        .toList());
+  }
+
 
   private void putChatRoom(
       UUID chatRoomId,