NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index 3b4bc16..4350779 100644 (file)
@@ -24,6 +24,7 @@ import java.time.Clock;
 import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 
 
 @ConditionalOnProperty(
@@ -57,7 +58,7 @@ public class KafkaServicesConfiguration
       Clock clock)
   {
     return new ChatRoomChannel(
-        properties.getKafka().getTopic(),
+        properties.getKafka().getChatroomChannelTopic(),
         chatRoomChannelProducer,
         chatRoomChannelConsumer,
         shardingStrategy,
@@ -68,12 +69,18 @@ public class KafkaServicesConfiguration
 
   @Bean
   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
-      Map<String, String> defaultProducerProperties,
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
       JsonSerializer<ChatRoomTo> chatRoomSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         integerSerializer,
         chatRoomSerializer);
   }
@@ -88,16 +95,24 @@ public class KafkaServicesConfiguration
   JsonSerializer<ChatRoomTo> chatRoomSerializer()
   {
     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
     return serializer;
   }
 
   @Bean
   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
-      Map<String, String> defaultConsumerProperties,
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
   {
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_room_channel");
@@ -117,6 +132,12 @@ public class KafkaServicesConfiguration
   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
   {
     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
     return deserializer;
   }
 
@@ -134,7 +155,7 @@ public class KafkaServicesConfiguration
       ZoneId zoneId)
   {
     return new ChatMessageChannel(
-        properties.getKafka().getTopic(),
+        properties.getKafka().getMessageChannelTopic(),
         chatMessageChannelProducer,
         chatMessageChannelConsumer,
         zoneId,
@@ -143,12 +164,18 @@ public class KafkaServicesConfiguration
 
   @Bean
   Producer<String, MessageTo>  chatMessageChannelProducer(
-      Map<String, String> defaultProducerProperties,
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
       JsonSerializer<MessageTo> messageSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         stringSerializer,
         messageSerializer);
   }
@@ -163,16 +190,24 @@ public class KafkaServicesConfiguration
   JsonSerializer<MessageTo> chatMessageSerializer()
   {
     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
     return serializer;
   }
 
   @Bean
   Consumer<String, MessageTo>  chatMessageChannelConsumer(
-      Map<String, String> defaultConsumerProperties,
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
       JsonDeserializer<MessageTo> messageDeserializer)
   {
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_message_channel");
@@ -192,31 +227,39 @@ public class KafkaServicesConfiguration
   JsonDeserializer<MessageTo> chatMessageDeserializer()
   {
     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
     return deserializer;
   }
 
   @Bean
-  Map<String, String> defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
   {
-    return Map.of(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId(),
+    Properties properties = new Properties();
+    properties.setProperty(
         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
         chatBackendProperties.getKafka().getBootstrapServers());
+    return properties;
   }
 
   @Bean
-  Map<String, String> defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
   {
-    return Map.of(
+    Properties properties = new Properties();
+    properties.setProperty(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-        chatBackendProperties.getKafka().getBootstrapServers(),
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId(),
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-        "false",
+        "false");
+    properties.setProperty(
         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
         "earliest");
+    return properties;
   }
 
   @Bean