* Unerwartete aber eigentlich logische Folge:
** Es werden weiterhing alle 100 Nachrichten abgeholt
** Der Testfall schlägt trotzdem nicht fehl, da die Erwartung an die
gespeicherten Offsets dynamisch aus den tatsächlich vom Listener
bezogenen Nachrichten bestimmt wird.
* Der Effekt tritt ein, weil der `ErrorHandlingDeserializer` ja die
verursachende Exception schluckt und dafür eine Behandlung des Fehlers
vornimmt, die den Fehler für die nachfolgenden Komponenten transparenter
machen soll.
* Es ist aber eher das Gegenteil der Fall, da die Folge ist, dass der
`poll()`-Aufruf alle Nachrichten - _auch die nach dem Fehler!_ - abholt.
* D.h., es ist nicht mehr so simpel und elegant erkennbar, an welcher
Stelle die `RecordDeserializationException` aufgetreten ist
* Wenn man - wie hier beabsichtigt - die Konsumption beim Auftreten des
dieser Sorte Ausnahme-Fehler stoppen und die Offsets für alle _vor_ dem
Auftreten des Fehlers fehlerfrei empfangenen Nachrichten dafür speichern
will, dann ist dies ohne eine umständliche Offset-Verwaltung und vielen
`seek()`-Aufrufen nicht mehr möglich!
* Eine einfache Abhilfe scheint nicht möglich, da der
`KafkaMessageListenerContainer` nicht damit umgehen kann, wenn _in_ dem
`poll()` eine Exception geworfen wird und dann - wie beobachtet - in
einer Endlosschleife hängen bleibt.
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
import java.util.function.Consumer;
import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
import java.util.function.Consumer;
+ @Bean
+ public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, Long> consumerFactory)
+ {
+ ConcurrentKafkaListenerContainerFactory<String, Long> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+
+ factory.setConsumerFactory(consumerFactory);
+ factory.setBatchListener(true);
+
+ return factory;
+ }
+
@Bean
public CommonContainerStoppingErrorHandler errorHandler()
{
@Bean
public CommonContainerStoppingErrorHandler errorHandler()
{
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
import java.util.function.Consumer;
Consumer<ConsumerRecord<K, V>> handler;
Consumer<ConsumerRecord<K, V>> handler;
- @KafkaListener(topics = "${consumer.topic}")
- public void receive(ConsumerRecord<K, V> record)
+ @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+ public void receive(List<ConsumerRecord<K, V>> records)
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
+ // Do something with the data...
+ log.info("{} - Received {} messages", id, records.size());
+ for (ConsumerRecord<K, V> record : records)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
- handler.accept(record);
+ handler.accept(record);
+ }