X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=a3dddb1ba26869a473df9c7de8a3e68adacc9023;hb=4cf9f879647735d6f635bc6fb7930486a2cc6d72;hp=45769736eaa8e1c6e498466718e9f3cf88e5e940;hpb=72f89d410a60f693b644fb6f7124f78249bb9c0b;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 45769736..a3dddb1b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -70,11 +70,17 @@ public class KafkaServicesConfiguration @Bean Producer chatRoomChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, integerSerializer, chatRoomSerializer); } @@ -89,17 +95,24 @@ public class KafkaServicesConfiguration JsonSerializer chatRoomSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); @@ -119,6 +132,12 @@ public class KafkaServicesConfiguration JsonDeserializer chatRoomDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @@ -146,11 +165,17 @@ public class KafkaServicesConfiguration @Bean Producer chatMessageChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, stringSerializer, messageSerializer); } @@ -165,17 +190,24 @@ public class KafkaServicesConfiguration JsonSerializer chatMessageSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -195,6 +227,12 @@ public class KafkaServicesConfiguration JsonDeserializer chatMessageDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @@ -202,9 +240,6 @@ public class KafkaServicesConfiguration Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); properties.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); @@ -218,9 +253,6 @@ public class KafkaServicesConfiguration properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); - properties.setProperty( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); properties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");