--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController
+{
+ private final ExampleConsumer consumer;
+
+ @GetMapping
+ Map<String, Long> getAllCounters()
+ {
+ return new HashMap<>(consumer.getCounterState());
+ }
+}
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
- Long counter = counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ Long counter = computeCount(key);
log.info("{} - current value for counter {}: {}", id, key, counter);
}
+ private synchronized Long computeCount(String key)
+ {
+ return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ }
+
+ public synchronized Map<String, Long> getCounterState()
+ {
+ return counterState;
+ }
public void shutdown() throws InterruptedException
{