From: Kai Moritz Date: Sat, 17 Sep 2022 09:16:26 +0000 (+0200) Subject: Concurrency auf 2 gesetzt und Verzögerung eingebaut X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=efc1fcea800a7c3d9cde94acd9c95e16c3bfb317;p=demos%2Fkafka%2Ftraining Concurrency auf 2 gesetzt und Verzögerung eingebaut * Durch die künstliche Verzögerung von Nachrichten für `+klaus+` wird deutlich erkennbar, dass die Nachrichten jetzt durch 2 Threads verarbeitet werden. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2075781..0590319 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -18,6 +18,7 @@ import java.util.Optional; @Slf4j @KafkaListener( id = "${spring.kafka.consumer.group-id}", + concurrency = "2", topics = "${sumup.adder.topic}") public class ApplicationRecordHandler { @@ -39,6 +40,14 @@ public class ApplicationRecordHandler { log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); + if ("klaus".equals(user)) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } throttle(); }