X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=9e1f75e7856eee91888967b8d06c507c9047311e;hb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;hp=4350779234d2031c8d69178b989c0bd967450a86;hpb=e400c437b999e1e92f2a52e3ef72306a9107aff9;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 43507792..9e1f75e7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -51,8 +51,8 @@ public class KafkaServicesConfiguration @Bean ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ShardingStrategy shardingStrategy, ChatMessageChannel chatMessageChannel, Clock clock) @@ -68,11 +68,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, - JsonSerializer chatRoomSerializer) + JsonSerializer chatRoomSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -92,9 +92,9 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatRoomSerializer() + JsonSerializer chatRoomSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), false); @@ -102,11 +102,11 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, - JsonDeserializer chatRoomDeserializer) + JsonDeserializer chatRoomDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -129,13 +129,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatRoomDeserializer() + JsonDeserializer chatRoomDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer; @@ -150,8 +150,8 @@ public class KafkaServicesConfiguration @Bean ChatMessageChannel chatMessageChannel( ChatBackendProperties properties, - Producer chatMessageChannelProducer, - Consumer chatMessageChannelConsumer, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, ZoneId zoneId) { return new ChatMessageChannel( @@ -163,11 +163,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatMessageChannelProducer( + Producer chatMessageChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, - JsonSerializer messageSerializer) + JsonSerializer messageSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -187,21 +187,23 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatMessageSerializer() + JsonSerializer chatMessageSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + Map.of(JsonSerializer.TYPE_MAPPINGS, + "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," + + "message:" + ChatMessageTo.class.getCanonicalName()), false); return serializer; } @Bean - Consumer chatMessageChannelConsumer( + Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) + JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -224,13 +226,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatMessageDeserializer() + JsonDeserializer chatMessageDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer;