From fff11a80b8f917dfea8f08e8ba894e01571f86c6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Sep 2023 23:29:16 +0200 Subject: [PATCH] WIP:ALIGN --- .../implementation/kafka/DataChannel.java | 51 +++++++------------ 1 file changed, 19 insertions(+), 32 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 9c3fd632..4c9abd1e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -139,6 +139,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); }); + + consumer.resume(partitions); } @Override @@ -174,17 +176,22 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener if (loadInProgress) { - loadChatRooms(records); + loadChatRoom(records); if (isLoadingCompleted()) { log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); loadInProgress = false; } } else { - createChatRooms(records); + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } } } catch (WakeupException e) @@ -197,7 +204,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRooms(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -235,35 +242,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } } - - private void createChatRooms(ConsumerRecords records) - { - for (ConsumerRecord record : records) - { - UUID chatRoomId = UUID.fromString(record.key()); - - switch (record.value().getType()) - { - case COMMAND_CREATE_CHATROOM: - log.info("Received create-request for chat room: {}", chatRoomId); - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - default: - log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, - record.offset(), - record.value()); - } - - nextOffset[record.partition()] = record.offset() + 1; - } - } - private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, @@ -325,6 +303,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } + private void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + private void putChatRoom( UUID chatRoomId, -- 2.20.1