GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
authorKai Moritz <kai@juplo.de>
Sun, 27 Oct 2024 12:38:47 +0000 (13:38 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 13:49:59 +0000 (14:49 +0100)
* Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
* `CounterStateController` kopiert die Map, um mögliche konkurierende
  Zugriffe während des Erzeugens der Ausgabe zu vermeiden.

src/main/java/de/juplo/kafka/CounterStateController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/CounterStateController.java b/src/main/java/de/juplo/kafka/CounterStateController.java
new file mode 100644 (file)
index 0000000..3d96582
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController
+{
+  private final ExampleConsumer consumer;
+
+  @GetMapping
+  Map<String, Long> getAllCounters()
+  {
+    return new HashMap<>(consumer.getCounterState());
+  }
+}
index 517cd6b..4cb6b21 100644 (file)
@@ -90,10 +90,19 @@ public class ExampleConsumer implements Runnable
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
-    Long counter = counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+    Long counter = computeCount(key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
   }
 
+  private synchronized Long computeCount(String key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<String, Long> getCounterState()
+  {
+    return counterState;
+  }
 
   public void shutdown() throws InterruptedException
   {