NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index 55aa6f8..b0e7776 100644 (file)
@@ -1,23 +1,30 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.springframework.boot.ApplicationRunner;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.time.Clock;
 import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
 
 
 @ConditionalOnProperty(
@@ -25,7 +32,7 @@ import java.time.ZoneId;
     name = "services",
     havingValue = "kafka")
 @Configuration
-public class KafkaServicesConfiguration implements ApplicationRunner
+public class KafkaServicesConfiguration
 {
   @Bean
   ChatHome kafkaChatHome(
@@ -60,6 +67,86 @@ public class KafkaServicesConfiguration implements ApplicationRunner
         properties.getChatroomBufferSize());
   }
 
+  @Bean
+  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
+      IntegerSerializer integerSerializer,
+      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+    return new KafkaProducer<>(
+        properties,
+        integerSerializer,
+        chatRoomSerializer);
+  }
+
+  @Bean
+  IntegerSerializer integerSerializer()
+  {
+    return new IntegerSerializer();
+  }
+
+  @Bean
+  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  {
+    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
+    return serializer;
+  }
+
+  @Bean
+  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      IntegerDeserializer integerDeserializer,
+      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_room_channel");
+    return new KafkaConsumer<>(
+        properties,
+        integerDeserializer,
+        chatRoomDeserializer);
+  }
+
+  @Bean
+  IntegerDeserializer integerDeserializer()
+  {
+    return new IntegerDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  {
+    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
+    return deserializer;
+  }
+
+  @Bean
+  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+  }
+
   @Bean
   ChatMessageChannel chatMessageChannel(
       ChatBackendProperties properties,
@@ -76,9 +163,106 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   }
 
   @Bean
-  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  Producer<String, MessageTo>  chatMessageChannelProducer(
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringSerializer stringSerializer,
+      JsonSerializer<MessageTo> messageSerializer)
   {
-    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
+    return new KafkaProducer<>(
+        properties,
+        stringSerializer,
+        messageSerializer);
+  }
+
+  @Bean
+  StringSerializer stringSerializer()
+  {
+    return new StringSerializer();
+  }
+
+  @Bean
+  JsonSerializer<MessageTo> chatMessageSerializer()
+  {
+    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
+    return serializer;
+  }
+
+  @Bean
+  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<MessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_message_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  StringDeserializer stringDeserializer()
+  {
+    return new StringDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  {
+    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
+    return deserializer;
+  }
+
+  @Bean
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    return properties;
+  }
+
+  @Bean
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix());
+    properties.setProperty(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        "false");
+    properties.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        "earliest");
+    return properties;
   }
 
   @Bean