import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
public class KafkaServicesConfiguration
{
@Bean
- ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+ ConsumerTaskRunner consumerTaskRunner(
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ InfoChannel infoChannel)
+ {
+ return new ConsumerTaskRunner(
+ infoChannelConsumerTaskExecutor,
+ dataChannelConsumerTaskExecutor,
+ infoChannel);
+ }
+
+ @Bean
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
- ChatRoomChannel chatRoomChannel,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
- ConsumerTaskExecutor.WorkAssignor workAssignor)
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ WorkAssignor infoChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
- chatRoomChannel,
- chatRoomChannelConsumer,
- workAssignor);
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo ->
+ new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
}
@Bean
- ConsumerTaskExecutor.WorkAssignor workAssignor(
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
+ WorkAssignor dataChannelWorkAssignor)
+ {
+ return new ConsumerTaskExecutor(
+ taskExecutor,
+ dataChannel,
+ dataChannelConsumer,
+ dataChannelWorkAssignor);
+ }
+
+ @Bean
+ WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
+ DataChannel dataChannel)
{
return consumer ->
{
List<String> topics =
- List.of(properties.getKafka().getChatRoomChannelTopic());
- consumer.subscribe(topics, chatRoomChannel);
+ List.of(properties.getKafka().getDataChannelTopic());
+ consumer.subscribe(topics, dataChannel);
};
}
@Bean
ChatHomeService kafkaChatHome(
ChatBackendProperties properties,
- ChatRoomChannel chatRoomChannel)
+ InfoChannel infoChannel,
+ DataChannel dataChannel)
{
return new KafkaChatHomeService(
properties.getKafka().getNumPartitions(),
- chatRoomChannel);
+ infoChannel,
+ dataChannel);
+ }
+
+ @Bean
+ InfoChannel infoChannel(
+ ChatBackendProperties properties,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer)
+ {
+ return new InfoChannel(
+ properties.getKafka().getInfoChannelTopic(),
+ producer,
+ infoChannelConsumer,
+ properties.getKafka().getInstanceUri());
}
@Bean
- ChatRoomChannel chatRoomChannel(
+ DataChannel dataChannel(
ChatBackendProperties properties,
- Producer<String, AbstractMessageTo> chatRoomChannelProducer,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ Producer<String, AbstractMessageTo> producer,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
- Clock clock)
+ Clock clock,
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
- return new ChatRoomChannel(
- properties.getKafka().getChatRoomChannelTopic(),
- chatRoomChannelProducer,
- chatRoomChannelConsumer,
+ return new DataChannel(
+ properties.getInstanceId(),
+ properties.getKafka().getDataChannelTopic(),
+ producer,
+ dataChannelConsumer,
zoneId,
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
- clock);
+ clock,
+ infoChannel,
+ shardingPublisherStrategy);
}
@Bean
- Producer<String, AbstractMessageTo> chatRoomChannelProducer(
+ Producer<String, AbstractMessageTo> producer(
Properties defaultProducerProperties,
ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ProducerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
return new KafkaProducer<>(
properties,
stringSerializer,
}
@Bean
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer(
+ Consumer<String, AbstractMessageTo> infoChannelConsumer(
Properties defaultConsumerProperties,
ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
properties.put(
ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
- "chatroom_channel");
+ "info_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ Consumer<String, AbstractMessageTo> dataChannelConsumer(
+ Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<AbstractMessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "data_channel");
return new KafkaConsumer<>(
properties,
stringDeserializer,
String typeMappings ()
{
return
- "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," +
"event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
}
return properties;
}
+ @Bean
+ ShardingPublisherStrategy shardingPublisherStrategy(
+ ChatBackendProperties properties)
+ {
+ String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":");
+ InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1]));
+ return new HaproxyShardingPublisherStrategy(
+ haproxyAddress,
+ properties.getKafka().getHaproxyMap(),
+ properties.getInstanceId());
+ }
+
@Bean
ZoneId zoneId()
{