From: Kai Moritz Date: Wed, 13 Sep 2023 18:54:27 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-09-14--22-59~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2c472509009c5770e26922096e01c9e749700139;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 018dc653..18611d41 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -12,6 +13,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -64,25 +66,41 @@ public class KafkaServicesConfiguration @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, + InfoChannel infoChannel, DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), + infoChannel, dataChannel); } @Bean - DataChannel chatRoomChannel( + InfoChannel infoChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer producer, + Consumer infoChannelConsumer, + DataChannel dataChannel) + { + return new InfoChannel( + properties.getKafka().getInfoChannelTopic(), + producer, + infoChannelConsumer, + dataChannel); + } + + @Bean + DataChannel dataChannel( + ChatBackendProperties properties, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, Clock clock) { return new DataChannel( properties.getKafka().getDataChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, + producer, + dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), @@ -90,7 +108,7 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer producer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -100,7 +118,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -125,7 +143,28 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer infoChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "info_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + Consumer dataChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, @@ -135,10 +174,10 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); + "data_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -167,7 +206,7 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); }