From 9259b912d4294b7b21b49536ed1cc72175e1b2eb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Apr 2022 13:40:47 +0200 Subject: [PATCH] =?utf8?q?Refaktorisierung=20f=C3=BCr=20Tests=20-=20Record?= =?utf8?q?-Handler=20als=20Bean=20konfigurierbar?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann jetzt ein `java.util.function.Consumer` als Bean definiert werden * Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert gezählt, damit der Handler die Nachricht über eine Exception ablehnen kann. --- .../de/juplo/kafka/ApplicationConfiguration.java | 15 ++++++++++++++- src/main/java/de/juplo/kafka/EndlessConsumer.java | 6 +++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 76d0c8a..f228d85 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -10,16 +11,27 @@ import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) public class ApplicationConfiguration { + @Bean + public Consumer> consumer() + { + return (record) -> + { + // Handle record + }; + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + Consumer> handler, ApplicationProperties properties) { return @@ -27,7 +39,8 @@ public class ApplicationConfiguration executor, properties.getClientId(), properties.getTopic(), - kafkaConsumer); + kafkaConsumer, + handler); } @Bean diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0bf5925..38dd360 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -24,6 +24,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final java.util.function.Consumer> handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -94,7 +95,6 @@ public class EndlessConsumer implements Runnable log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - consumed++; log.info( "{} - {}: {}/{} - {}={}", id, @@ -105,6 +105,10 @@ public class EndlessConsumer implements Runnable record.value() ); + handler.accept(record); + + consumed++; + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); Map byKey = seen.get(partition); -- 2.20.1