Concurrency auf 2 gesetzt und Verzögerung eingebaut sumup-adder--springified--concurrency
authorKai Moritz <kai@juplo.de>
Sat, 17 Sep 2022 09:16:26 +0000 (11:16 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 17 Sep 2022 09:16:32 +0000 (11:16 +0200)
* Durch die künstliche Verzögerung von Nachrichten für `+klaus+` wird
  deutlich erkennbar, dass die Nachrichten jetzt durch 2 Threads
  verarbeitet werden.

src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index 2075781..0590319 100644 (file)
@@ -18,6 +18,7 @@ import java.util.Optional;
 @Slf4j
 @KafkaListener(
     id = "${spring.kafka.consumer.group-id}",
+    concurrency = "2",
     topics = "${sumup.adder.topic}")
 public class ApplicationRecordHandler
 {
@@ -39,6 +40,14 @@ public class ApplicationRecordHandler
   {
     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
+    if ("klaus".equals(user))
+    {
+      try
+      {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {}
+    }
     throttle();
   }