feat: Introduced interface `ShardingPublisherStrategy`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index 784ffa5..c3027fa 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -20,6 +21,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.time.ZoneId;
@@ -39,11 +41,13 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+      InfoChannel infoChannel)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor);
+        dataChannelConsumerTaskExecutor,
+        infoChannel);
   }
 
   @Bean
@@ -124,7 +128,8 @@ public class KafkaServicesConfiguration
     return new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
-        infoChannelConsumer);
+        infoChannelConsumer,
+        properties.getKafka().getInstanceUri());
   }
 
   @Bean
@@ -134,9 +139,11 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
+        properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
@@ -144,7 +151,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
@@ -276,6 +284,18 @@ public class KafkaServicesConfiguration
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy()
+  {
+    return new ShardingPublisherStrategy() {
+      @Override
+      public Mono<String> publishOwnership(int shard)
+      {
+        return Mono.just(Integer.toString(shard));
+      }
+    };
+  }
+
   @Bean
   ZoneId zoneId()
   {