From 72f89d410a60f693b644fb6f7124f78249bb9c0b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 11:35:57 +0200 Subject: [PATCH] NEU --- .../kafka/KafkaServicesConfiguration.java | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index c157cce3..45769736 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -24,6 +24,7 @@ import java.time.Clock; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Properties; @ConditionalOnProperty( @@ -68,7 +69,7 @@ public class KafkaServicesConfiguration @Bean Producer chatRoomChannelProducer( - Map defaultProducerProperties, + Properties defaultProducerProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { @@ -93,11 +94,12 @@ public class KafkaServicesConfiguration @Bean Consumer chatRoomChannelConsumer( - Map defaultConsumerProperties, + Properties defaultConsumerProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { - Map properties = new HashMap<>(defaultConsumerProperties); + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); @@ -143,7 +145,7 @@ public class KafkaServicesConfiguration @Bean Producer chatMessageChannelProducer( - Map defaultProducerProperties, + Properties defaultProducerProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { @@ -168,11 +170,12 @@ public class KafkaServicesConfiguration @Bean Consumer chatMessageChannelConsumer( - Map defaultConsumerProperties, + Properties defaultConsumerProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { - Map properties = new HashMap<>(defaultConsumerProperties); + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -196,27 +199,35 @@ public class KafkaServicesConfiguration } @Bean - Map defaultProducerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { - return Map.of( + Properties properties = new Properties(); + properties.setProperty( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId(), + chatBackendProperties.getKafka().getClientId()); + properties.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); + return properties; } @Bean - Map defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) { - return Map.of( + Properties properties = new Properties(); + properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers(), + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId(), + chatBackendProperties.getKafka().getClientId()); + properties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - "false", + "false"); + properties.setProperty( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return properties; } @Bean -- 2.20.1