X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=c157cce3779cdb8feb4f95537bfbf27b310702ea;hb=5b0f5543b2d514baba0fd020adffe410e4b64d4e;hp=55aa6f84802fae279932f9bcd0d15c82f27c99f0;hpb=2a47c081a50f8ebdda6db9955c3ddc69e5c601c2;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 55aa6f84..c157cce3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -1,23 +1,29 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.springframework.boot.ApplicationRunner; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Clock; import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; @ConditionalOnProperty( @@ -25,7 +31,7 @@ import java.time.ZoneId; name = "services", havingValue = "kafka") @Configuration -public class KafkaServicesConfiguration implements ApplicationRunner +public class KafkaServicesConfiguration { @Bean ChatHome kafkaChatHome( @@ -60,6 +66,66 @@ public class KafkaServicesConfiguration implements ApplicationRunner properties.getChatroomBufferSize()); } + @Bean + Producer chatRoomChannelProducer( + Map defaultProducerProperties, + IntegerSerializer integerSerializer, + JsonSerializer chatRoomSerializer) + { + return new KafkaProducer<>( + defaultProducerProperties, + integerSerializer, + chatRoomSerializer); + } + + @Bean + IntegerSerializer integerSerializer() + { + return new IntegerSerializer(); + } + + @Bean + JsonSerializer chatRoomSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Map defaultConsumerProperties, + IntegerDeserializer integerDeserializer, + JsonDeserializer chatRoomDeserializer) + { + Map properties = new HashMap<>(defaultConsumerProperties); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_room_channel"); + return new KafkaConsumer<>( + properties, + integerDeserializer, + chatRoomDeserializer); + } + + @Bean + IntegerDeserializer integerDeserializer() + { + return new IntegerDeserializer(); + } + + @Bean + JsonDeserializer chatRoomDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + return deserializer; + } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + @Bean ChatMessageChannel chatMessageChannel( ChatBackendProperties properties, @@ -76,9 +142,81 @@ public class KafkaServicesConfiguration implements ApplicationRunner } @Bean - ShardingStrategy shardingStrategy(ChatBackendProperties properties) + Producer chatMessageChannelProducer( + Map defaultProducerProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) { - return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + return new KafkaProducer<>( + defaultProducerProperties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Map defaultConsumerProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(defaultConsumerProperties); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + return deserializer; + } + + @Bean + Map defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + return Map.of( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientId(), + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + } + + @Bean + Map defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + return Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers(), + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientId(), + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); } @Bean