private int numPartitions = 2;
private Duration pollingInterval = Duration.ofSeconds(1);
private String haproxyRuntimeApi = "haproxy:8401";
+ private String haproxyDataPlaneApi = "http://haproxy:5555/v2/";
+ private String haproxyUser = "juplo";
+ private String haproxyPassword = "juplo";
private String haproxyMap = "sharding";
}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyDataPlaneApiShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyRuntimeApiShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.web.reactive.function.client.WebClient;
import java.net.InetSocketAddress;
import java.time.Clock;
ShardingPublisherStrategy shardingPublisherStrategy(
ChatBackendProperties properties)
{
- String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
- InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
- return new HaproxyRuntimeApiShardingPublisherStrategy(
- haproxyAddress,
+ return new HaproxyDataPlaneApiShardingPublisherStrategy(
+ WebClient
+ .builder()
+ .baseUrl(properties.getKafka().getHaproxyDataPlaneApi())
+ .defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(
+ properties.getKafka().getHaproxyUser(),
+ properties.getKafka().getHaproxyPassword()))
+ .build(),
properties.getKafka().getHaproxyMap(),
properties.getInstanceId());
}