package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
-import org.springframework.boot.ApplicationRunner;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
@ConditionalOnProperty(
prefix = "chat.backend",
name = "services",
- havingValue = "inmemory",
- matchIfMissing = true)
+ havingValue = "kafka")
@Configuration
-public class KafkaServicesConfiguration implements ApplicationRunner
+public class KafkaServicesConfiguration
{
@Bean
ChatHome kafkaChatHome(
- ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel)
{
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new KafkaChatHome(shardingStrategy, chatMessageChannel);
}
@Bean
- KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
{
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- numShards,
- ownedShards,
- storageStrategy.read());
+ return new KafkaChatRoomFactory(chatRoomChannel);
}
@Bean
- InMemoryChatRoomFactory chatRoomFactory(
- InMemoryChatHomeService service,
- ShardingStrategy strategy,
- Clock clock,
- ChatBackendProperties properties)
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel,
+ Clock clock)
{
- return new InMemoryChatRoomFactory(
- service,
- strategy,
+ return new ChatRoomChannel(
+ properties.getKafka().getTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ shardingStrategy,
+ chatMessageChannel,
clock,
properties.getChatroomBufferSize());
}
@Bean
- ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ {
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_room_channel");
+ return new KafkaConsumer<>(
+ properties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
{
- return new KafkaLikeShardingStrategy(
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ ChatMessageChannel chatMessageChannel(
+ ChatBackendProperties properties,
+ Producer<String, MessageTo> chatMessageChannelProducer,
+ Consumer<String, MessageTo> chatMessageChannelConsumer,
+ ZoneId zoneId)
+ {
+ return new ChatMessageChannel(
+ properties.getKafka().getTopic(),
+ chatMessageChannelProducer,
+ chatMessageChannelConsumer,
+ zoneId,
properties.getKafka().getNumPartitions());
}
+
+ @Bean
+ Producer<String, MessageTo> chatMessageChannelProducer(
+ Properties defaultProducerProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<MessageTo> messageSerializer)
+ {
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ serializer.configure(
+ Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+ false);
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Properties defaultConsumerProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_message_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(
+ JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+ JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+ JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+ false );
+ return deserializer;
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
}