`@KafkaListener` mit paraleller Verarbeitung spring/spring-consumer--kafkalistener--concurrency--generics4all
authorKai Moritz <kai@juplo.de>
Sun, 2 Feb 2025 21:14:31 +0000 (22:14 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 1 Mar 2025 16:02:13 +0000 (17:02 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 6618004..fcf5e11 100644 (file)
@@ -18,7 +18,7 @@ public class ExampleConsumer<K, V>
   private String id;
   private long consumed = 0;
 
-  @KafkaListener(topics = "${juplo.consumer.topic}")
+  @KafkaListener(topics = "${juplo.consumer.topic}", concurrency = "2")
   private void receive(
     @Header(KafkaHeaders.RECEIVED_TOPIC)
     String topic,
@@ -41,6 +41,9 @@ public class ExampleConsumer<K, V>
     K key,
     V value)
   {
+    // BEACHTE:
+    // Der Zugriff auf die Variable "consumed" ist hier so nicht mehr korrekt!
+    // Grund: Der Zugriff erfolgt ohne Locking konkurrierend aus zwei Threads.
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }