From c2a12cbd11d136ba02d26f61c0653a31e90d55db Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 6 May 2025 21:01:56 +0200 Subject: [PATCH] Erzeugung des `KafkaConsumer` in `ApplicationConfiguration` --- .../juplo/kafka/ApplicationConfiguration.java | 26 ++++++++++++++++--- .../java/de/juplo/kafka/ExampleConsumer.java | 15 ++--------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 76659d4..0c356f1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,21 +1,41 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Properties; + @Configuration @EnableConfigurationProperties(ApplicationProperties.class) public class ApplicationConfiguration { @Bean - public ExampleConsumer exampleConsumer(ApplicationProperties properties) + public ExampleConsumer exampleConsumer( + Consumer kafkaConsumer, + ApplicationProperties properties) { return new ExampleConsumer( - properties.getBroker(), + kafkaConsumer, properties.getTopic(), - properties.getGroupId(), properties.getClientId()); } + + @Bean(destroyMethod = "") + public Consumer kafkaConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + + props.put("bootstrap.servers", properties.getBroker()); + props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits + props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 68066fe..8861ccf 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,13 +4,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Properties; @Slf4j @@ -24,21 +21,13 @@ public class ExampleConsumer implements Runnable private long consumed = 0; public ExampleConsumer( - String broker, + Consumer consumer, String topic, - String groupId, String clientId) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - + this.consumer = consumer; this.id = clientId; this.topic = topic; - consumer = new KafkaConsumer<>(props); this.worker = new Thread(this, "ConsumerRunner-" + id); log.info("{} - Starting worker-thread", id); -- 2.20.1