From: Kai Moritz Date: Tue, 20 Feb 2024 08:53:10 +0000 (+0100) Subject: feat: Introduced interface `ShardingPublisherStrategy` X-Git-Tag: rebase--2024-02-20--10-29~28 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d53ef6cdaa5480e2f6a01499603dcb9f32350c1b;p=demos%2Fkafka%2Fchat feat: Introduced interface `ShardingPublisherStrategy` * The interface is used by `DataChannel` to publish the changed ownership each time, a new partition is assigned to the consumer-group. * Added a dummy-implementation in `KafkaServicesConfiguration`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java new file mode 100644 index 00000000..9a1c725e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Mono; + + +public interface ShardingPublisherStrategy +{ + Mono publishOwnership(int shard); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 9cafbaaa..2fa4998e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final long[] nextOffset; private final Map[] chatRoomData; private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter @@ -53,7 +54,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int numShards, int bufferSize, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( "{}: Creating DataChannel for topic {} with {} partitions", @@ -76,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -142,6 +145,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener consumer.seek(topicPartition, nextOffset[partition]); infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnNext(instanceId -> log.info( + "Instance {} was published as owner of shard {}", + instanceId, + partition)) + .subscribe(); }); consumer.resume(partitions); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 7bb1ab97..c3027fa9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -20,6 +21,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import reactor.core.publisher.Mono; import java.time.Clock; import java.time.ZoneId; @@ -137,7 +139,8 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( properties.getInstanceId(), @@ -148,7 +151,8 @@ public class KafkaServicesConfiguration properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), clock, - infoChannel); + infoChannel, + shardingPublisherStrategy); } @Bean @@ -280,6 +284,18 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy() + { + return new ShardingPublisherStrategy() { + @Override + public Mono publishOwnership(int shard) + { + return Mono.just(Integer.toString(shard)); + } + }; + } + @Bean ZoneId zoneId() { diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 956d7cec..52a527d3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -10,6 +11,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; +import reactor.core.publisher.Mono; import java.time.Clock; import java.util.List; @@ -23,6 +25,12 @@ public class KafkaTestUtils @Import(KafkaServicesConfiguration.class) public static class KafkaTestConfiguration { + @Bean + public ShardingPublisherStrategy shardingPublisherStrategy() + { + return shard -> Mono.just("MOCKED!"); + } + @Bean public WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties,