Erzeugung des `KafkaConsumer` in `ApplicationConfiguration`
authorKai Moritz <kai@juplo.de>
Tue, 6 May 2025 19:01:56 +0000 (21:01 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 May 2025 19:10:37 +0000 (21:10 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 76659d4..0c356f1 100644 (file)
@@ -1,21 +1,41 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Properties;
+
 
 @Configuration
 @EnableConfigurationProperties(ApplicationProperties.class)
 public class ApplicationConfiguration
 {
   @Bean
-  public ExampleConsumer exampleConsumer(ApplicationProperties properties)
+  public ExampleConsumer exampleConsumer(
+    Consumer<String, String> kafkaConsumer,
+    ApplicationProperties properties)
   {
     return new ExampleConsumer(
-      properties.getBroker(),
+      kafkaConsumer,
       properties.getTopic(),
-      properties.getGroupId(),
       properties.getClientId());
   }
+
+  @Bean(destroyMethod = "")
+  public Consumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+
+    props.put("bootstrap.servers", properties.getBroker());
+    props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits
+    props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung
+    props.put("key.deserializer", StringDeserializer.class.getName());
+    props.put("value.deserializer", StringDeserializer.class.getName());
+
+    return new KafkaConsumer<>(props);
+  }
 }
index 68066fe..8861ccf 100644 (file)
@@ -4,13 +4,10 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Properties;
 
 
 @Slf4j
@@ -24,21 +21,13 @@ public class ExampleConsumer implements Runnable
   private long consumed = 0;
 
   public ExampleConsumer(
-    String broker,
+    Consumer<String, String> consumer,
     String topic,
-    String groupId,
     String clientId)
   {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", broker);
-    props.put("group.id", groupId); // ID für die Offset-Commits
-    props.put("client.id", clientId); // Nur zur Wiedererkennung
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
-
+    this.consumer = consumer;
     this.id = clientId;
     this.topic = topic;
-    consumer = new KafkaConsumer<>(props);
 
     this.worker = new Thread(this, "ConsumerRunner-" + id);
     log.info("{} - Starting worker-thread", id);