From: Kai Moritz Date: Thu, 14 Apr 2022 16:56:04 +0000 (+0200) Subject: Springify: BatchListener konfiguriert - Hilft nicht wirklich... X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=9b3731e4a7aeca27f7ba11502bc7f0a66c1d22b8;p=demos%2Fkafka%2Ftraining Springify: BatchListener konfiguriert - Hilft nicht wirklich... * Unerwartete aber eigentlich logische Folge: ** Es werden weiterhing alle 100 Nachrichten abgeholt ** Der Testfall schlägt trotzdem nicht fehl, da die Erwartung an die gespeicherten Offsets dynamisch aus den tatsächlich vom Listener bezogenen Nachrichten bestimmt wird. * Der Effekt tritt ein, weil der `ErrorHandlingDeserializer` ja die verursachende Exception schluckt und dafür eine Behandlung des Fehlers vornimmt, die den Fehler für die nachfolgenden Komponenten transparenter machen soll. * Es ist aber eher das Gegenteil der Fall, da die Folge ist, dass der `poll()`-Aufruf alle Nachrichten - _auch die nach dem Fehler!_ - abholt. * D.h., es ist nicht mehr so simpel und elegant erkennbar, an welcher Stelle die `RecordDeserializationException` aufgetreten ist * Wenn man - wie hier beabsichtigt - die Konsumption beim Auftreten des dieser Sorte Ausnahme-Fehler stoppen und die Offsets für alle _vor_ dem Auftreten des Fehlers fehlerfrei empfangenen Nachrichten dafür speichern will, dann ist dies ohne eine umständliche Offset-Verwaltung und vielen `seek()`-Aufrufen nicht mehr möglich! * Eine einfache Abhilfe scheint nicht möglich, da der `KafkaMessageListenerContainer` nicht damit umgehen kann, wenn _in_ dem `poll()` eine Exception geworfen wird und dann - wie beobachtet - in einer Endlosschleife hängen bleibt. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index fd4ff28..4c74eb2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; import java.util.function.Consumer; @@ -22,6 +25,18 @@ public class ApplicationConfiguration }; } + @Bean + public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory) + { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + + factory.setConsumerFactory(consumerFactory); + factory.setBatchListener(true); + + return factory; + } + @Bean public CommonContainerStoppingErrorHandler errorHandler() { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 87780b4..929bdbd 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.util.List; import java.util.function.Consumer; @@ -22,19 +23,24 @@ public class EndlessConsumer Consumer> handler; - @KafkaListener(topics = "${consumer.topic}") - public void receive(ConsumerRecord record) + @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory") + public void receive(List> records) { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); + // Do something with the data... + log.info("{} - Received {} messages", id, records.size()); + for (ConsumerRecord record : records) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); - handler.accept(record); + handler.accept(record); + } } }