private final Consumer<String, String> consumer;
private final Thread workerThread;
- private final Map<Integer, Long> counterState = new HashMap<>();
+ private final Map<String, Long> counterState = new HashMap<>();
private volatile boolean running = false;
private long consumed = 0;
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
- Integer counted = Integer.parseInt(key);
- Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1);
- log.info("{} - current value for counter {}: {}", id, counted, counter);
+ Long counter = counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ log.info("{} - current value for counter {}: {}", id, key, counter);
}