From b1da0ca0f149cbff67962d05193048a17bd6ee4f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 28 Feb 2024 11:14:32 +0100 Subject: [PATCH] refactor: Introduced `ChannelMediator` * `InfoChannel` and `DataChannel` must not know each other directly. * This is necessary, to prevent a cyclic dependency, that would otherwise be introduced, if `InfoChannel` also has to communicate with `DataChannel`. --- .../implementation/kafka/ChannelMediator.java | 32 +++++++++++++++++++ .../implementation/kafka/DataChannel.java | 12 +++---- .../kafka/KafkaServicesConfiguration.java | 17 +++++++--- 3 files changed, 51 insertions(+), 10 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java new file mode 100644 index 00000000..d6e028a3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class ChannelMediator +{ + @Setter + private InfoChannel infoChannel; + + + void shardAssigned(int shard) + { + infoChannel.sendShardAssignedEvent(shard); + } + + void shardRevoked(int shard) + { + infoChannel.sendShardRevokedEvent(shard); + } + + Mono getChatRoomInfo(UUID id) + { + return infoChannel.getChatRoomInfo(id); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index b4cc33f5..64117f59 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -35,7 +35,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; - private final InfoChannel infoChannel; + private final ChannelMediator channelMediator; private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @@ -53,7 +53,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Duration pollingInterval, int bufferSize, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( @@ -77,7 +77,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); - this.infoChannel = infoChannel; + this.channelMediator = channelMediator; this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -144,7 +144,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); - infoChannel.sendShardAssignedEvent(partition); + channelMediator.shardAssigned(partition); shardingPublisherStrategy .publishOwnership(partition) .doOnSuccess(instanceId -> log.info( @@ -172,7 +172,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - infoChannel.sendShardRevokedEvent(partition); + channelMediator.shardRevoked(partition); }); } @@ -324,7 +324,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(instanceId, shard)); } - return infoChannel + return channelMediator .getChatRoomInfo(id) .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 6b7c1566..f78beb10 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -124,15 +124,18 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + ChannelMediator channelMediator) { - return new InfoChannel( + InfoChannel infoChannel = new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; } @Bean @@ -142,7 +145,7 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( @@ -155,10 +158,16 @@ public class KafkaServicesConfiguration properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, - infoChannel, + channelMediator, shardingPublisherStrategy); } + @Bean + ChannelMediator channelMediator() + { + return new ChannelMediator(); + } + @Bean Producer producer( Properties defaultProducerProperties, -- 2.20.1