From b810ed4e98a9a32c01419fa5fca2555f0f22cafc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 17 Nov 2024 13:06:26 +0100 Subject: [PATCH] Handling der Nachricht in das Interface `RecordHandler` verlegt --- .../de/juplo/kafka/ApplicationConfiguration.java | 10 ++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 16 ++++++++-------- src/main/java/de/juplo/kafka/RecordHandler.java | 11 +++++++++++ 3 files changed, 29 insertions(+), 8 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/RecordHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 37f0bf6..107c342 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -15,11 +16,13 @@ import java.util.Properties; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + RecordHandler recordHandler, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -28,9 +31,16 @@ public class ApplicationConfiguration properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, + recordHandler, () -> applicationContext.close()); } + @Bean + public RecordHandler recordHandler(ApplicationProperties properties) + { + return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); + } + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index bb0b619..d9e6c47 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; @@ -17,6 +18,7 @@ public class ExampleConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final RecordHandler recordHandler; private final Thread workerThread; private final Runnable closeCallback; @@ -28,11 +30,13 @@ public class ExampleConsumer implements Runnable String clientId, String topic, Consumer consumer, + RecordHandler recordHandler, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.recordHandler = recordHandler; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -68,15 +72,10 @@ public class ExampleConsumer implements Runnable record.value()); } } - catch (RecordDeserializationException e) + catch(RecordDeserializationException e) { - log.error( - "{} - Ignoring invalid record for offset {} on partition {}: {}", - id, - e.offset(), - e.topicPartition(), - e.getMessage()); - consumer.seek(e.topicPartition(), e.offset() + 1); + log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset()); + consumer.seek(e.topicPartition(), e.offset() +1); } } } @@ -108,6 +107,7 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + recordHandler.handleRecord(topic, partition, offset, key, value); } diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 0000000..a7b65af --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +public interface RecordHandler +{ + void handleRecord( + String topic, + Integer partition, + Long offset, + K key, + V value); +} -- 2.20.1