X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=c3027fa973a668c68573a0f4fb5a3c56a0abdd8d;hb=d53ef6cdaa5480e2f6a01499603dcb9f32350c1b;hp=7bb1ab972ba8c3715abdc0359385be61f1377048;hpb=2adb71f86149f3b7da0ecd5b5eba5ba8341ad503;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 7bb1ab97..c3027fa9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; @@ -20,6 +21,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import reactor.core.publisher.Mono; import java.time.Clock; import java.time.ZoneId; @@ -137,7 +139,8 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel) + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( properties.getInstanceId(), @@ -148,7 +151,8 @@ public class KafkaServicesConfiguration properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), clock, - infoChannel); + infoChannel, + shardingPublisherStrategy); } @Bean @@ -280,6 +284,18 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy() + { + return new ShardingPublisherStrategy() { + @Override + public Mono publishOwnership(int shard) + { + return Mono.just(Integer.toString(shard)); + } + }; + } + @Bean ZoneId zoneId() {