From 82b2820de4469cb4fd804614f86f9081da366091 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Sep 2023 23:23:16 +0200 Subject: [PATCH] WIP:ALIGN --- .../implementation/kafka/DataChannel.java | 89 +++++++------------ 1 file changed, 33 insertions(+), 56 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 738901aa..9c3fd632 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -77,42 +77,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, @@ -175,8 +139,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); }); - - consumer.resume(partitions); } @Override @@ -207,27 +169,22 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadChatRoom(records); + loadChatRooms(records); if (isLoadingCompleted()) { log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); loadInProgress = false; } } else { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } + createChatRooms(records); } } catch (WakeupException e) @@ -240,7 +197,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRooms(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -278,6 +235,35 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } } + + private void createChatRooms(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case COMMAND_CREATE_CHATROOM: + log.info("Received create-request for chat room: {}", chatRoomId); + createChatRoom( + chatRoomId, + (CommandCreateChatRoomTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, @@ -339,15 +325,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } - private void putChatRoom( UUID chatRoomId, -- 2.20.1