X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=58e111734d40b8c20a7bd39ff710951ece8a4353;hb=132c1d0092fc8377d92a4ded7ef349d858ae92cd;hp=d63111a7e04f618ef440e20f414491f22947d4db;hpb=386f950f8b329bf2b956fa7896e270a39037967d;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index d63111a7..58e11173 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -1,16 +1,18 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; +import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -18,10 +20,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.net.InetSocketAddress; import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,35 +39,133 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ChatHomeService kafkaChatHome( + KafkaServicesThreadPoolTaskExecutorCustomizer kafkaServicesThreadPoolTaskExecutorCustomizer() + { + return new KafkaServicesThreadPoolTaskExecutorCustomizer(); + } + + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") + ChannelTaskExecutor infoChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + InfoChannel infoChannel, + Consumer infoChannelConsumer, + WorkAssignor infoChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + infoChannel, + infoChannelConsumer, + infoChannelWorkAssignor); + } + + @Bean + WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties) + { + return consumer -> + { + String topic = properties.getKafka().getInfoChannelTopic(); + List partitions = consumer + .partitionsFor(topic) + .stream() + .map(partitionInfo -> + new TopicPartition(topic, partitionInfo.partition())) + .toList(); + consumer.assign(partitions); + }; + } + + @Bean(initMethod = "executeChannelTask", destroyMethod = "join") + ChannelTaskExecutor dataChannelTaskExecutor( + ThreadPoolTaskExecutor taskExecutor, + DataChannel dataChannel, + Consumer dataChannelConsumer, + WorkAssignor dataChannelWorkAssignor) + { + return new ChannelTaskExecutor( + taskExecutor, + dataChannel, + dataChannelConsumer, + dataChannelWorkAssignor); + } + + @Bean + WorkAssignor dataChannelWorkAssignor( + ChatBackendProperties properties, + DataChannel dataChannel) + { + return consumer -> + { + List topics = + List.of(properties.getKafka().getDataChannelTopic()); + consumer.subscribe(topics, dataChannel); + }; + } + + @Bean + KafkaChatHomeService kafkaChatHome( ChatBackendProperties properties, - ChatRoomChannel chatRoomChannel) + InfoChannel infoChannel, + DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), - chatRoomChannel); + infoChannel, + dataChannel); } @Bean - ChatRoomChannel chatRoomChannel( + InfoChannel infoChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer producer, + Consumer infoChannelConsumer, + ChannelMediator channelMediator) + { + InfoChannel infoChannel = new InfoChannel( + properties.getKafka().getInfoChannelTopic(), + producer, + infoChannelConsumer, + properties.getKafka().getPollingInterval(), + properties.getKafka().getNumPartitions(), + properties.getKafka().getInstanceUri(), + channelMediator); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; + } + + @Bean + DataChannel dataChannel( + ChatBackendProperties properties, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, - Clock clock) + Clock clock, + ChannelMediator channelMediator, + ShardingPublisherStrategy shardingPublisherStrategy) { - return new ChatRoomChannel( - properties.getKafka().getChatRoomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, + DataChannel dataChannel = new DataChannel( + properties.getInstanceId(), + properties.getKafka().getDataChannelTopic(), + producer, + dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), - properties.getChatroomBufferSize(), - clock); + properties.getKafka().getPollingInterval(), + properties.getChatroomHistoryLimit(), + clock, + channelMediator, + shardingPublisherStrategy); + channelMediator.setDataChannel(dataChannel); + return dataChannel; } @Bean - Producer chatRoomChannelProducer( + ChannelMediator channelMediator() + { + return new ChannelMediator(); + } + + @Bean + Producer producer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -72,7 +175,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -97,7 +200,7 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer infoChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, @@ -107,10 +210,31 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); + "info_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + Consumer dataChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "data_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -139,7 +263,7 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } @@ -169,9 +293,34 @@ public class KafkaServicesConfiguration return properties; } + @Bean + ShardingPublisherStrategy shardingPublisherStrategy( + ChatBackendProperties properties) + { + String[] parts = properties.getKafka().getHaproxyRuntimeApi().split(":"); + InetSocketAddress haproxyAddress = new InetSocketAddress(parts[0], Integer.valueOf(parts[1])); + return new HaproxyShardingPublisherStrategy( + haproxyAddress, + properties.getKafka().getHaproxyMap(), + properties.getInstanceId()); + } + @Bean ZoneId zoneId() { return ZoneId.systemDefault(); } + + @Bean + ChannelReactiveHealthIndicator dataChannelHealthIndicator( + DataChannel dataChannel) + { + return new ChannelReactiveHealthIndicator(dataChannel); + } + + @Bean + ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel) + { + return new ChannelReactiveHealthIndicator(infoChannel); + } }