NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index 4350779..9e1f75e 100644 (file)
@@ -51,8 +51,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
-      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
+      Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
       ShardingStrategy shardingStrategy,
       ChatMessageChannel chatMessageChannel,
       Clock clock)
@@ -68,11 +68,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+  Producer<Integer, CreateChatRoomRequestTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
-      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+      JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -92,9 +92,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
   {
-    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
     serializer.configure(
         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
         false);
@@ -102,11 +102,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+  Consumer<Integer, CreateChatRoomRequestTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
-      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+      JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -129,13 +129,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
   {
-    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;
@@ -150,8 +150,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatMessageChannel chatMessageChannel(
       ChatBackendProperties properties,
-      Producer<String, MessageTo> chatMessageChannelProducer,
-      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      Producer<String, AbstractTo> chatMessageChannelProducer,
+      Consumer<String, AbstractTo> chatMessageChannelConsumer,
       ZoneId zoneId)
   {
     return new ChatMessageChannel(
@@ -163,11 +163,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, MessageTo>  chatMessageChannelProducer(
+  Producer<String, AbstractTo>  chatMessageChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
-      JsonSerializer<MessageTo> messageSerializer)
+      JsonSerializer<AbstractTo> messageSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -187,21 +187,23 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<MessageTo> chatMessageSerializer()
+  JsonSerializer<AbstractTo> chatMessageSerializer()
   {
-    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
     serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        Map.of(JsonSerializer.TYPE_MAPPINGS,
+            "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
+            "message:" + ChatMessageTo.class.getCanonicalName()),
         false);
     return serializer;
   }
 
   @Bean
-  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+  Consumer<String, ChatMessageTo>  chatMessageChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
-      JsonDeserializer<MessageTo> messageDeserializer)
+      JsonDeserializer<ChatMessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -224,13 +226,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
   {
-    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;