From a574411f66c4e76fa547e004912716e37a5f2a01 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 10:13:52 +0200 Subject: [PATCH] NEU --- .../chat/backend/ChatBackendProperties.java | 2 + .../kafka/KafkaServicesConfiguration.java | 48 +++++++++++++++---- 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 4724f6b5..4d8f18d5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -34,6 +34,8 @@ public class ChatBackendProperties @Setter public static class KafkaServicesProperties { + private String clientId; + private String bootstrapServers = ":9092"; private String topic = "test"; private int numPartitions = 2; } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index b11babc7..efc86b9e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -7,9 +7,11 @@ import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -126,12 +128,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Producer chatRoomChannelProducer( - Properties producerProperties, + Properties defaultProducerProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { return new KafkaProducer<>( - producerProperties, + defaultProducerProperties, integerSerializer, chatRoomSerializer); } @@ -151,12 +153,16 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Consumer chatRoomChannelConsumer( - Properties producerProperties, + Properties defaultConsumerProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { + Properties properties = new Properties(defaultConsumerProperties); + properties.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_room_channel"); return new KafkaConsumer<>( - producerProperties, + properties, integerDeserializer, chatRoomDeserializer); } @@ -197,12 +203,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Producer chatMessageChannelProducer( - Properties producerProperties, + Properties defaultProducerProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { return new KafkaProducer<>( - producerProperties, + defaultProducerProperties, stringSerializer, messageSerializer); } @@ -222,12 +228,16 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Consumer chatMessageChannelConsumer( - Properties producerProperties, + Properties defaultConsumerProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { + Properties properties = new Properties(defaultConsumerProperties); + properties.setProperty( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); return new KafkaConsumer<>( - producerProperties, + properties, stringDeserializer, messageDeserializer); } @@ -246,16 +256,34 @@ public class KafkaServicesConfiguration implements ApplicationRunner } @Bean - Properties producerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientId()); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); return properties; } @Bean - Properties consumerProperties(ChatBackendProperties chatBackendProperties) + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) { Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientId()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); return properties; } -- 2.20.1