X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=4350779234d2031c8d69178b989c0bd967450a86;hb=832c8ed50217ce40734ca9c5e326263f89567177;hp=c157cce3779cdb8feb4f95537bfbf27b310702ea;hpb=5b0f5543b2d514baba0fd020adffe410e4b64d4e;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index c157cce3..43507792 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -24,6 +24,7 @@ import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Properties; @ConditionalOnProperty( @@ -57,7 +58,7 @@ public class KafkaServicesConfiguration Clock clock) { return new ChatRoomChannel( - properties.getKafka().getTopic(), + properties.getKafka().getChatroomChannelTopic(), chatRoomChannelProducer, chatRoomChannelConsumer, shardingStrategy, @@ -68,12 +69,18 @@ public class KafkaServicesConfiguration @Bean Producer chatRoomChannelProducer( - Map defaultProducerProperties, + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, integerSerializer, chatRoomSerializer); } @@ -88,16 +95,24 @@ public class KafkaServicesConfiguration JsonSerializer chatRoomSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatRoomChannelConsumer( - Map defaultConsumerProperties, + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { - Map properties = new HashMap<>(defaultConsumerProperties); + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); @@ -117,6 +132,12 @@ public class KafkaServicesConfiguration JsonDeserializer chatRoomDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @@ -134,7 +155,7 @@ public class KafkaServicesConfiguration ZoneId zoneId) { return new ChatMessageChannel( - properties.getKafka().getTopic(), + properties.getKafka().getMessageChannelTopic(), chatMessageChannelProducer, chatMessageChannelConsumer, zoneId, @@ -143,12 +164,18 @@ public class KafkaServicesConfiguration @Bean Producer chatMessageChannelProducer( - Map defaultProducerProperties, + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, stringSerializer, messageSerializer); } @@ -163,16 +190,24 @@ public class KafkaServicesConfiguration JsonSerializer chatMessageSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatMessageChannelConsumer( - Map defaultConsumerProperties, + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { - Map properties = new HashMap<>(defaultConsumerProperties); + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -192,31 +227,39 @@ public class KafkaServicesConfiguration JsonDeserializer chatMessageDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @Bean - Map defaultProducerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { - return Map.of( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId(), + Properties properties = new Properties(); + properties.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); + return properties; } @Bean - Map defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) { - return Map.of( + Properties properties = new Properties(); + properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers(), - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId(), + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - "false", + "false"); + properties.setProperty( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return properties; } @Bean