Springify: BatchListener konfiguriert - Hilft nicht wirklich...
authorKai Moritz <kai@juplo.de>
Thu, 14 Apr 2022 16:56:04 +0000 (18:56 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 08:29:58 +0000 (10:29 +0200)
* Unerwartete aber eigentlich logische Folge:
** Es werden weiterhing alle 100 Nachrichten abgeholt
** Der Testfall schlägt trotzdem nicht fehl, da die Erwartung an die
   gespeicherten Offsets dynamisch aus den tatsächlich vom Listener
   bezogenen Nachrichten bestimmt wird.
* Der Effekt tritt ein, weil der `ErrorHandlingDeserializer` ja die
  verursachende Exception schluckt und dafür eine Behandlung des Fehlers
  vornimmt, die den Fehler für die nachfolgenden Komponenten transparenter
  machen soll.
* Es ist aber eher das Gegenteil der Fall, da die Folge ist, dass der
  `poll()`-Aufruf alle Nachrichten - _auch die nach dem Fehler!_ - abholt.
* D.h., es ist nicht mehr so simpel und elegant erkennbar, an welcher
  Stelle die `RecordDeserializationException` aufgetreten ist
* Wenn man - wie hier beabsichtigt - die Konsumption beim Auftreten des
  dieser Sorte Ausnahme-Fehler stoppen und die Offsets für alle _vor_ dem
  Auftreten des Fehlers fehlerfrei empfangenen Nachrichten dafür speichern
  will, dann ist dies ohne eine umständliche Offset-Verwaltung und vielen
  `seek()`-Aufrufen nicht mehr möglich!
* Eine einfache Abhilfe scheint nicht möglich, da der
  `KafkaMessageListenerContainer` nicht damit umgehen kann, wenn _in_ dem
  `poll()` eine Exception geworfen wird und dann - wie beobachtet - in
  einer Endlosschleife hängen bleibt.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index fd4ff28..4c74eb2 100644 (file)
@@ -4,6 +4,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
 
 import java.util.function.Consumer;
@@ -22,6 +25,18 @@ public class ApplicationConfiguration
     };
   }
 
+  @Bean
+  public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, Long> consumerFactory)
+  {
+    ConcurrentKafkaListenerContainerFactory<String, Long> factory =
+        new ConcurrentKafkaListenerContainerFactory<>();
+
+    factory.setConsumerFactory(consumerFactory);
+    factory.setBatchListener(true);
+
+    return factory;
+  }
+
   @Bean
   public CommonContainerStoppingErrorHandler errorHandler()
   {
index 87780b4..929bdbd 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
 import java.util.function.Consumer;
 
 
@@ -22,19 +23,24 @@ public class EndlessConsumer<K, V>
   Consumer<ConsumerRecord<K, V>> handler;
 
 
-  @KafkaListener(topics = "${consumer.topic}")
-  public void receive(ConsumerRecord<K, V> record)
+  @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+  public void receive(List<ConsumerRecord<K, V>> records)
   {
-    log.info(
-        "{} - {}: {}/{} - {}={}",
-        id,
-        record.offset(),
-        record.topic(),
-        record.partition(),
-        record.key(),
-        record.value()
-    );
+    // Do something with the data...
+    log.info("{} - Received {} messages", id, records.size());
+    for (ConsumerRecord<K, V> record : records)
+    {
+      log.info(
+          "{} - {}: {}/{} - {}={}",
+          id,
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          record.key(),
+          record.value()
+      );
 
-    handler.accept(record);
+      handler.accept(record);
+    }
   }
 }