From: Kai Moritz Date: Sat, 9 Apr 2022 11:40:47 +0000 (+0200) Subject: Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar X-Git-Tag: deserialization-synchroner-test~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=9259b912d4294b7b21b49536ed1cc72175e1b2eb;p=demos%2Fkafka%2Ftraining Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar * Ein Handler für die Verarbeitung der einzelnen ConsumerRecord's kann jetzt ein `java.util.function.Consumer` als Bean definiert werden * Die Nachricht wird erst nach dem Aufruf des Handlers als konsumiert gezählt, damit der Handler die Nachricht über eine Exception ablehnen kann. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 76d0c8a..f228d85 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -10,16 +11,27 @@ import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) public class ApplicationConfiguration { + @Bean + public Consumer> consumer() + { + return (record) -> + { + // Handle record + }; + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + Consumer> handler, ApplicationProperties properties) { return @@ -27,7 +39,8 @@ public class ApplicationConfiguration executor, properties.getClientId(), properties.getTopic(), - kafkaConsumer); + kafkaConsumer, + handler); } @Bean diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0bf5925..38dd360 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -24,6 +24,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final java.util.function.Consumer> handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -94,7 +95,6 @@ public class EndlessConsumer implements Runnable log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - consumed++; log.info( "{} - {}: {}/{} - {}={}", id, @@ -105,6 +105,10 @@ public class EndlessConsumer implements Runnable record.value() ); + handler.accept(record); + + consumed++; + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); Map byKey = seen.get(partition);