From 7eafa4662087f46dcb26e4c109e5bf06f5cbfcab Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 11:04:41 +0200 Subject: [PATCH] NEU --- .../kafka/KafkaServicesConfiguration.java | 43 ++++++++----------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index b7eb711a..3b4bc164 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -22,7 +22,8 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Clock; import java.time.ZoneId; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; @ConditionalOnProperty( @@ -67,7 +68,7 @@ public class KafkaServicesConfiguration @Bean Producer chatRoomChannelProducer( - Properties defaultProducerProperties, + Map defaultProducerProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { @@ -92,12 +93,12 @@ public class KafkaServicesConfiguration @Bean Consumer chatRoomChannelConsumer( - Properties defaultConsumerProperties, + Map defaultConsumerProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { - Properties properties = new Properties(defaultConsumerProperties); - properties.setProperty( + Map properties = new HashMap<>(); + properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); return new KafkaConsumer<>( @@ -142,7 +143,7 @@ public class KafkaServicesConfiguration @Bean Producer chatMessageChannelProducer( - Properties defaultProducerProperties, + Map defaultProducerProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { @@ -167,12 +168,12 @@ public class KafkaServicesConfiguration @Bean Consumer chatMessageChannelConsumer( - Properties defaultConsumerProperties, + Map defaultConsumerProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { - Properties properties = new Properties(defaultConsumerProperties); - properties.setProperty( + Map properties = new HashMap<>(); + properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); return new KafkaConsumer<>( @@ -195,35 +196,27 @@ public class KafkaServicesConfiguration } @Bean - Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + Map defaultProducerProperties(ChatBackendProperties chatBackendProperties) { - Properties properties = new Properties(); - properties.setProperty( + return Map.of( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); - properties.setProperty( + chatBackendProperties.getKafka().getClientId(), ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); - return properties; } @Bean - Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + Map defaultConsumerProperties(ChatBackendProperties chatBackendProperties) { - Properties properties = new Properties(); - properties.setProperty( + return Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers()); - properties.setProperty( + chatBackendProperties.getKafka().getBootstrapServers(), ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); - properties.setProperty( + chatBackendProperties.getKafka().getClientId(), ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - "false"); - properties.setProperty( + "false", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - return properties; } @Bean -- 2.20.1