WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 11 Sep 2023 21:23:16 +0000 (23:23 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:44:25 +0000 (22:44 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 738901a..9c3fd63 100644 (file)
@@ -77,42 +77,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
-          sink.success(chatRoomInfo);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
   Mono<Message> sendChatMessage(
       UUID chatRoomId,
       Message.MessageKey key,
@@ -175,8 +139,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
     });
-
-    consumer.resume(partitions);
   }
 
   @Override
@@ -207,27 +169,22 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
         {
-          loadChatRoom(records);
+          loadChatRooms(records);
 
           if (isLoadingCompleted())
           {
             log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
             loadInProgress = false;
           }
         }
         else
         {
-          if (!records.isEmpty())
-          {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
-          }
+          createChatRooms(records);
         }
       }
       catch (WakeupException e)
@@ -240,7 +197,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRooms(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
@@ -278,6 +235,35 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
+
+  private void createChatRooms(ConsumerRecords<String, AbstractMessageTo> records)
+  {
+    for (ConsumerRecord<String, AbstractMessageTo> record : records)
+    {
+      UUID chatRoomId = UUID.fromString(record.key());
+
+      switch (record.value().getType())
+      {
+        case COMMAND_CREATE_CHATROOM:
+          log.info("Received create-request for chat room: {}", chatRoomId);
+          createChatRoom(
+              chatRoomId,
+              (CommandCreateChatRoomTo) record.value(),
+              record.partition());
+          break;
+
+        default:
+          log.debug(
+              "Ignoring message for chat-room {} with offset {}: {}",
+              chatRoomId,
+              record.offset(),
+              record.value());
+      }
+
+      nextOffset[record.partition()] = record.offset() + 1;
+    }
+  }
+
   private void createChatRoom(
       UUID chatRoomId,
       CommandCreateChatRoomTo createChatRoomRequestTo,
@@ -339,15 +325,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
   }
 
-  private void pauseAllOwnedPartions()
-  {
-    consumer.pause(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
-        .toList());
-  }
-
 
   private void putChatRoom(
       UUID chatRoomId,