From efc1fcea800a7c3d9cde94acd9c95e16c3bfb317 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 17 Sep 2022 11:16:26 +0200 Subject: [PATCH] =?utf8?q?Concurrency=20auf=202=20gesetzt=20und=20Verz?= =?utf8?q?=C3=B6gerung=20eingebaut?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Durch die künstliche Verzögerung von Nachrichten für `+klaus+` wird deutlich erkennbar, dass die Nachrichten jetzt durch 2 Threads verarbeitet werden. --- .../java/de/juplo/kafka/ApplicationRecordHandler.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2075781..0590319 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -18,6 +18,7 @@ import java.util.Optional; @Slf4j @KafkaListener( id = "${spring.kafka.consumer.group-id}", + concurrency = "2", topics = "${sumup.adder.topic}") public class ApplicationRecordHandler { @@ -39,6 +40,14 @@ public class ApplicationRecordHandler { log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); + if ("klaus".equals(user)) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } throttle(); } -- 2.20.1