WIP:fix:activation
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index c3027fa..58e1117 100644 (file)
@@ -1,8 +1,8 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -21,8 +21,8 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import reactor.core.publisher.Mono;
 
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -39,25 +39,19 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ConsumerTaskRunner consumerTaskRunner(
-      ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
-      InfoChannel infoChannel)
+  KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer()
   {
-    return new ConsumerTaskRunner(
-        infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor,
-        infoChannel);
+    return new KafkaServicesThreadPoolTaskExecutorCustomizer();
   }
 
-  @Bean
-  ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+  @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
+  ChannelTaskExecutor infoChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
       Consumer<String, AbstractMessageTo> infoChannelConsumer,
       WorkAssignor infoChannelWorkAssignor)
   {
-    return new ConsumerTaskExecutor(
+    return new ChannelTaskExecutor(
         taskExecutor,
         infoChannel,
         infoChannelConsumer,
@@ -80,14 +74,14 @@ public class KafkaServicesConfiguration
     };
   }
 
-  @Bean
-  ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+  @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
+  ChannelTaskExecutor dataChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       WorkAssignor dataChannelWorkAssignor)
   {
-    return new ConsumerTaskExecutor(
+    return new ChannelTaskExecutor(
         taskExecutor,
         dataChannel,
         dataChannelConsumer,
@@ -108,7 +102,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-    ChatHomeService kafkaChatHome(
+  KafkaChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
       InfoChannel infoChannel,
       DataChannel dataChannel)
@@ -123,13 +117,19 @@ public class KafkaServicesConfiguration
   InfoChannel infoChannel(
       ChatBackendProperties properties,
       Producer<String, AbstractMessageTo> producer,
-      Consumer<String, AbstractMessageTo> infoChannelConsumer)
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      ChannelMediator channelMediator)
   {
-    return new InfoChannel(
+    InfoChannel infoChannel = new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
         infoChannelConsumer,
-        properties.getKafka().getInstanceUri());
+        properties.getKafka().getPollingInterval(),
+        properties.getKafka().getNumPartitions(),
+        properties.getKafka().getInstanceUri(),
+        channelMediator);
+    channelMediator.setInfoChannel(infoChannel);
+    return infoChannel;
   }
 
   @Bean
@@ -139,20 +139,29 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel,
+      ChannelMediator channelMediator,
       ShardingPublisherStrategy shardingPublisherStrategy)
   {
-    return new DataChannel(
+    DataChannel dataChannel = new DataChannel(
         properties.getInstanceId(),
         properties.getKafka().getDataChannelTopic(),
         producer,
         dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
-        properties.getChatroomBufferSize(),
+        properties.getKafka().getPollingInterval(),
+        properties.getChatroomHistoryLimit(),
         clock,
-        infoChannel,
+        channelMediator,
         shardingPublisherStrategy);
+    channelMediator.setDataChannel(dataChannel);
+    return dataChannel;
+  }
+
+  @Bean
+  ChannelMediator channelMediator()
+  {
+    return new ChannelMediator();
   }
 
   @Bean
@@ -285,15 +294,15 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ShardingPublisherStrategy shardingPublisherStrategy()
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties)
   {
-    return new ShardingPublisherStrategy() {
-      @Override
-      public Mono<String> publishOwnership(int shard)
-      {
-        return Mono.just(Integer.toString(shard));
-      }
-    };
+    String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
+    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+    return new HaproxyShardingPublisherStrategy(
+        haproxyAddress,
+        properties.getKafka().getHaproxyMap(),
+        properties.getInstanceId());
   }
 
   @Bean
@@ -301,4 +310,17 @@ public class KafkaServicesConfiguration
   {
     return ZoneId.systemDefault();
   }
+
+  @Bean
+  ChannelReactiveHealthIndicator dataChannelHealthIndicator(
+      DataChannel dataChannel)
+  {
+    return new ChannelReactiveHealthIndicator(dataChannel);
+  }
+
+  @Bean
+  ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
+  {
+    return new ChannelReactiveHealthIndicator(infoChannel);
+  }
 }