NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 08:13:52 +0000 (10:13 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 08:13:52 +0000 (10:13 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 4724f6b..4d8f18d 100644 (file)
@@ -34,6 +34,8 @@ public class ChatBackendProperties
   @Setter
   public static class KafkaServicesProperties
   {
+    private String clientId;
+    private String bootstrapServers = ":9092";
     private String topic = "test";
     private int numPartitions = 2;
   }
index b11babc..efc86b9 100644 (file)
@@ -7,9 +7,11 @@ import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -126,12 +128,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner
 
   @Bean
   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
-      Properties producerProperties,
+      Properties defaultProducerProperties,
       IntegerSerializer integerSerializer,
       JsonSerializer<ChatRoomTo> chatRoomSerializer)
   {
     return new KafkaProducer<>(
-        producerProperties,
+        defaultProducerProperties,
         integerSerializer,
         chatRoomSerializer);
   }
@@ -151,12 +153,16 @@ public class KafkaServicesConfiguration implements ApplicationRunner
 
   @Bean
   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
-      Properties producerProperties,
+      Properties defaultConsumerProperties,
       IntegerDeserializer integerDeserializer,
       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
   {
+    Properties properties = new Properties(defaultConsumerProperties);
+    properties.setProperty(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_room_channel");
     return new KafkaConsumer<>(
-        producerProperties,
+        properties,
         integerDeserializer,
         chatRoomDeserializer);
   }
@@ -197,12 +203,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner
 
   @Bean
   Producer<String, MessageTo>  chatMessageChannelProducer(
-      Properties producerProperties,
+      Properties defaultProducerProperties,
       StringSerializer stringSerializer,
       JsonSerializer<MessageTo> messageSerializer)
   {
     return new KafkaProducer<>(
-        producerProperties,
+        defaultProducerProperties,
         stringSerializer,
         messageSerializer);
   }
@@ -222,12 +228,16 @@ public class KafkaServicesConfiguration implements ApplicationRunner
 
   @Bean
   Consumer<String, MessageTo>  chatMessageChannelConsumer(
-      Properties producerProperties,
+      Properties defaultConsumerProperties,
       StringDeserializer stringDeserializer,
       JsonDeserializer<MessageTo> messageDeserializer)
   {
+    Properties properties = new Properties(defaultConsumerProperties);
+    properties.setProperty(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chat_message_channel");
     return new KafkaConsumer<>(
-        producerProperties,
+        properties,
         stringDeserializer,
         messageDeserializer);
   }
@@ -246,16 +256,34 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   }
 
   @Bean
-  Properties producerProperties(ChatBackendProperties chatBackendProperties)
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
   {
     Properties properties = new Properties();
+    properties.setProperty(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientId());
+    properties.setProperty(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
     return properties;
   }
 
   @Bean
-  Properties consumerProperties(ChatBackendProperties chatBackendProperties)
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
   {
     Properties properties = new Properties();
+    properties.setProperty(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientId());
+    properties.setProperty(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        "false");
+    properties.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        "earliest");
     return properties;
   }