From 0b02ef4ca02908f49ee97607ce0b6c33656d47dd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 20:54:27 +0200 Subject: [PATCH] WIP --- .../kafka/KafkaServicesConfiguration.java | 61 +++++++++++++++---- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index caa9228a..7833d2ae 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -12,6 +13,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -35,25 +37,41 @@ public class KafkaServicesConfiguration @Bean ChatHomeService kafkaChatHome( ChatBackendProperties properties, + InfoChannel infoChannel, DataChannel dataChannel) { return new KafkaChatHomeService( properties.getKafka().getNumPartitions(), + infoChannel, dataChannel); } @Bean - DataChannel chatRoomChannel( + InfoChannel infoChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer producer, + Consumer infoChannelConsumer, + DataChannel dataChannel) + { + return new InfoChannel( + properties.getKafka().getInfoChannelTopic(), + producer, + infoChannelConsumer, + dataChannel); + } + + @Bean + DataChannel dataChannel( + ChatBackendProperties properties, + Producer producer, + Consumer dataChannelConsumer, ZoneId zoneId, Clock clock) { return new DataChannel( properties.getKafka().getDataChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, + producer, + dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), @@ -61,7 +79,7 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer producer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -71,7 +89,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -96,7 +114,28 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer infoChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "info_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + Consumer dataChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, @@ -106,10 +145,10 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chatroom_channel"); + "data_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -138,7 +177,7 @@ public class KafkaServicesConfiguration String typeMappings () { return - "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatroom_created:" + EventChatRoomCreated.class.getCanonicalName() + "," + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } -- 2.20.1