fix: Errors during shard-publishing should not kill the instance
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaServicesConfiguration.java
index c3027fa..6b7c156 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -21,8 +22,8 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import reactor.core.publisher.Mono;
 
+import java.net.InetSocketAddress;
 import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
@@ -108,7 +109,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-    ChatHomeService kafkaChatHome(
+  KafkaChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
       InfoChannel infoChannel,
       DataChannel dataChannel)
@@ -129,6 +130,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getInfoChannelTopic(),
         producer,
         infoChannelConsumer,
+        properties.getKafka().getPollingInterval(),
+        properties.getKafka().getNumPartitions(),
         properties.getKafka().getInstanceUri());
   }
 
@@ -149,6 +152,7 @@ public class KafkaServicesConfiguration
         dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
+        properties.getKafka().getPollingInterval(),
         properties.getChatroomBufferSize(),
         clock,
         infoChannel,
@@ -285,15 +289,15 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ShardingPublisherStrategy shardingPublisherStrategy()
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties)
   {
-    return new ShardingPublisherStrategy() {
-      @Override
-      public Mono<String> publishOwnership(int shard)
-      {
-        return Mono.just(Integer.toString(shard));
-      }
-    };
+    String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
+    InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+    return new HaproxyShardingPublisherStrategy(
+        haproxyAddress,
+        properties.getKafka().getHaproxyMap(),
+        properties.getInstanceId());
   }
 
   @Bean