juplo.producer.throttle-ms: 100
consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-log-compaction-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import java.time.Duration;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
@Slf4j
private final Thread workerThread;
private final Runnable closeCallback;
+ private final Map<Integer, Long> counterState = new HashMap<>();
+
private volatile boolean running = false;
private long consumed = 0;
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ Integer counted = Integer.parseInt(key);
+ Long counter = counterState.compute(counted, (k, v) -> v == null ? 1l : v + 1);
+ log.info("{} - current value for counter {}: {}", id, counted, counter);
}