X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=4350779234d2031c8d69178b989c0bd967450a86;hb=832c8ed50217ce40734ca9c5e326263f89567177;hp=fd42d9df41c9809876f84cc3f1829aee106fe88c;hpb=915ed8f85459da3c95f86b6351a3d7129668bc8e;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index fd42d9df..43507792 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -1,78 +1,270 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; -import org.springframework.boot.ApplicationRunner; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; @ConditionalOnProperty( prefix = "chat.backend", name = "services", - havingValue = "inmemory", - matchIfMissing = true) + havingValue = "kafka") @Configuration -public class KafkaServicesConfiguration implements ApplicationRunner +public class KafkaServicesConfiguration { @Bean ChatHome kafkaChatHome( - ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); + return new KafkaChatHome(shardingStrategy, chatMessageChannel); } @Bean - KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties) + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) { - ShardingStrategyType sharding = - properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); - return new InMemoryChatHomeService( - numShards, - ownedShards, - storageStrategy.read()); + return new KafkaChatRoomFactory(chatRoomChannel); } @Bean - InMemoryChatRoomFactory chatRoomFactory( - InMemoryChatHomeService service, - ShardingStrategy strategy, - Clock clock, - ChatBackendProperties properties) + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel, + Clock clock) { - return new InMemoryChatRoomFactory( - service, - strategy, + return new ChatRoomChannel( + properties.getKafka().getChatroomChannelTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + shardingStrategy, + chatMessageChannel, clock, properties.getChatroomBufferSize()); } @Bean - ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + Producer chatRoomChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + IntegerSerializer integerSerializer, + JsonSerializer chatRoomSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + integerSerializer, + chatRoomSerializer); + } + + @Bean + IntegerSerializer integerSerializer() + { + return new IntegerSerializer(); + } + + @Bean + JsonSerializer chatRoomSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + IntegerDeserializer integerDeserializer, + JsonDeserializer chatRoomDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_room_channel"); + return new KafkaConsumer<>( + properties, + integerDeserializer, + chatRoomDeserializer); + } + + @Bean + IntegerDeserializer integerDeserializer() + { + return new IntegerDeserializer(); + } + + @Bean + JsonDeserializer chatRoomDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) { - return new KafkaLikeShardingStrategy( + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId) + { + return new ChatMessageChannel( + properties.getKafka().getMessageChannelTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, properties.getKafka().getNumPartitions()); } + + @Bean + Producer chatMessageChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } }