import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.web.reactive.function.client.WebClient;
 
 import java.time.Clock;
 import java.time.ZoneId;
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
         properties.getKafka().getDataChannelTopic(),
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy(
+      ChatBackendProperties properties,
+      WebClient webClient)
+  {
+    return new HaproxyShardingPublisherStrategy(
+        properties.getHaproxyRuntimeApiUri(),
+        webClient);
+  }
+
   @Bean
   ZoneId zoneId()
   {