From d72b7b754699cbd0df8aeaa955730dbbeff6e88e Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 27 Oct 2024 13:38:47 +0100
Subject: [PATCH] =?utf8?q?GET-Endpoint=20zum=20Abfragen=20der=20Schl=C3=BC?=
 =?utf8?q?ssel-Z=C3=A4hlungen=20erg=C3=A4nzt?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

* Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
* `CounterStateController` kopiert die Map, um mögliche konkurierende
  Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
---
 .../juplo/kafka/CounterStateController.java   | 22 +++++++++++++++++++
 .../java/de/juplo/kafka/ExampleConsumer.java  | 11 +++++++++-
 2 files changed, 32 insertions(+), 1 deletion(-)
 create mode 100644 src/main/java/de/juplo/kafka/CounterStateController.java

diff --git a/src/main/java/de/juplo/kafka/CounterStateController.java b/src/main/java/de/juplo/kafka/CounterStateController.java
new file mode 100644
index 00000000..3d965829
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/CounterStateController.java
@@ -0,0 +1,22 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController
+{
+  private final ExampleConsumer consumer;
+
+  @GetMapping
+  Map<String, Long> getAllCounters()
+  {
+    return new HashMap<>(consumer.getCounterState());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 0ade38fc..6767c6bb 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -98,10 +98,19 @@ public class ExampleConsumer implements Runnable
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
-    Long counter = counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+    Long counter = computeCount(key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
   }
 
+  private synchronized Long computeCount(String key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<String, Long> getCounterState()
+  {
+    return counterState;
+  }
 
   public void shutdown() throws InterruptedException
   {
-- 
2.20.1