From: Kai Moritz Date: Sun, 2 Feb 2025 21:14:31 +0000 (+0100) Subject: `@KafkaListener` mit paraleller Verarbeitung X-Git-Tag: spring/spring-consumer--kafkalistener--concurrency--COMMITS--2025-02 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=16fa02f581ad5b009e0cbb540842065534ec8676;p=demos%2Fkafka%2Ftraining `@KafkaListener` mit paraleller Verarbeitung --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 5894a103..58a249e0 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -18,7 +18,7 @@ public class ExampleConsumer private String id; private long consumed = 0; - @KafkaListener(topics = "${juplo.consumer.topic}") + @KafkaListener(topics = "${juplo.consumer.topic}", concurrency = "2") private void receive( @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @@ -41,6 +41,9 @@ public class ExampleConsumer String key, String value) { + // BEACHTE: + // Der Zugriff auf die Variable "consumed" ist hier so nicht mehr korrekt! + // Grund: Der Zugriff erfolgt ohne Locking konkurrierend aus zwei Threads. consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); }