--- /dev/null
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class CounterState
+{
+ private final Map<String, Long> counterState = new HashMap<>();
+
+
+ public void setCounterState(String key, long counter)
+ {
+ counterState.put(key, counter);
+ }
+
+ public synchronized Long addToCounter(String key)
+ {
+ return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ }
+
+ public synchronized Map<String, Long> getCounterState()
+ {
+ return counterState;
+ }
+}
private final Thread workerThread;
private final Runnable closeCallback;
- private final Map<String, Long> counterState = new HashMap<>();
+ private final CounterState counterState = new CounterState();
private final String stateTopic;
private final Producer<String, String> producer;
String key,
String value)
{
- counterState.put(key, Long.parseLong(value));
+ counterState.setCounterState(key, Long.parseLong(value));
if (offset + 1 == stateEndOffsets[partition])
{
log.info(
private synchronized Long computeCount(String key)
{
- return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ return counterState.addToCounter(key);
}
- public synchronized Map<String, Long> getCounterState()
+ public Map<String, Long> getCounterState()
{
- return counterState;
+ return counterState.getCounterState();
}
void sendCounterState(int partition, String key, Long counter)