From: Kai Moritz Date: Wed, 28 Feb 2024 10:14:32 +0000 (+0100) Subject: refactor: Introduced `ChannelMediator` X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b1da0ca0f149cbff67962d05193048a17bd6ee4f;p=demos%2Fkafka%2Fchat refactor: Introduced `ChannelMediator` * `InfoChannel` and `DataChannel` must not know each other directly. * This is necessary, to prevent a cyclic dependency, that would otherwise be introduced, if `InfoChannel` also has to communicate with `DataChannel`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java new file mode 100644 index 00000000..d6e028a3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelMediator.java @@ -0,0 +1,32 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class ChannelMediator +{ + @Setter + private InfoChannel infoChannel; + + + void shardAssigned(int shard) + { + infoChannel.sendShardAssignedEvent(shard); + } + + void shardRevoked(int shard) + { + infoChannel.sendShardRevokedEvent(shard); + } + + Mono getChatRoomInfo(UUID id) + { + return infoChannel.getChatRoomInfo(id); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index b4cc33f5..64117f59 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -35,7 +35,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; - private final InfoChannel infoChannel; + private final ChannelMediator channelMediator; private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @@ -53,7 +53,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Duration pollingInterval, int bufferSize, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( @@ -77,7 +77,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); - this.infoChannel = infoChannel; + this.channelMediator = channelMediator; this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -144,7 +144,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); - infoChannel.sendShardAssignedEvent(partition); + channelMediator.shardAssigned(partition); shardingPublisherStrategy .publishOwnership(partition) .doOnSuccess(instanceId -> log.info( @@ -172,7 +172,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - infoChannel.sendShardRevokedEvent(partition); + channelMediator.shardRevoked(partition); }); } @@ -324,7 +324,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(instanceId, shard)); } - return infoChannel + return channelMediator .getChatRoomInfo(id) .map(chatRoomInfo -> chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 6b7c1566..f78beb10 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -124,15 +124,18 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + ChannelMediator channelMediator) { - return new InfoChannel( + InfoChannel infoChannel = new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; } @Bean @@ -142,7 +145,7 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( @@ -155,10 +158,16 @@ public class KafkaServicesConfiguration properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, - infoChannel, + channelMediator, shardingPublisherStrategy); } + @Bean + ChannelMediator channelMediator() + { + return new ChannelMediator(); + } + @Bean Producer producer( Properties defaultProducerProperties,