`@KafkaListener` mit paraleller Verarbeitung spring/spring-consumer--kafkalistener--concurrency--generics4some
authorKai Moritz <kai@juplo.de>
Sun, 2 Feb 2025 21:14:31 +0000 (22:14 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 1 Mar 2025 21:09:24 +0000 (22:09 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 5894a10..58a249e 100644 (file)
@@ -18,7 +18,7 @@ public class ExampleConsumer
   private String id;
   private long consumed = 0;
 
-  @KafkaListener(topics = "${juplo.consumer.topic}")
+  @KafkaListener(topics = "${juplo.consumer.topic}", concurrency = "2")
   private void receive(
     @Header(KafkaHeaders.RECEIVED_TOPIC)
     String topic,
@@ -41,6 +41,9 @@ public class ExampleConsumer
     String key,
     String value)
   {
+    // BEACHTE:
+    // Der Zugriff auf die Variable "consumed" ist hier so nicht mehr korrekt!
+    // Grund: Der Zugriff erfolgt ohne Locking konkurrierend aus zwei Threads.
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }