#!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf ute peter
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
 
 
 docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
+docker compose -f docker/docker-compose.yml up -d ute
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v ute:8881/
 
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
+docker compose -f docker/docker-compose.yml up -d peter
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v peter:8881/
+docker compose -f docker/docker-compose.yml exec cli http -v peter:8881/
 
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml stop producer ute peter
 
       - kafka-3
 
   producer:
-    image: juplo/simple-producer:1.0-SNAPSHOT
-    command: kafka:9092 test producer
+    image: juplo/spring-producer:2.0-SNAPSHOT
+    environment:
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: producer
+      spring.kafka.producer.properties.linger.ms: 666
+      juplo.producer.topic: test
+      juplo.producer.throttle: 10
 
   consumer:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
   peter:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
   ute:
-    image: juplo/spring-consumer:1.1-SNAPSHOT
+    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
       juplo.consumer.topic: test
+      juplo.producer.linger-ms: 1000
+      logging.level.de.juplo: TRACE
 
 volumes:
   zookeeper-data:
 
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-SNAPSHOT</version>
+  <version>1.1-rebalance-listener-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
 
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+public class CounterState<K>
+{
+  private final Map<String, Long> counterState;
+
+
+  public synchronized Long addToCounter(String key)
+  {
+    return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+  }
+
+  public synchronized Map<String, Long> getCounterState()
+  {
+    return new HashMap<>(counterState);
+  }
+}
 
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController
+{
+  private final ExampleConsumer consumer;
+
+  @GetMapping
+  Map<Integer, Map<String, Long>> getAllCounters()
+  {
+    return consumer.getCounterState();
+  }
+}
 
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
 
 
 @Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String id;
   private final String topic;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
+  private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+  private CounterState<String>[] counterState;
   private long consumed = 0;
 
 
   {
     try
     {
+      log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+      int numPartitions = consumer.partitionsFor(topic).size();
+      log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+      counterState = new CounterState[numPartitions];
+
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), this);
 
       while (true)
       {
   {
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+
+    Long counter = computeCount(partition, key);
+    log.info("{} - current value for counter {}: {}", id, key, counter);
+  }
+
+  private synchronized Long computeCount(int partition, String key)
+  {
+    return counterState[partition].addToCounter(key);
+  }
+
+  public Map<Integer, Map<String, Long>> getCounterState()
+  {
+    Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
+    assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+    return result;
+  }
+
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition ->
+      {
+        assignedPartitions.add(partition);
+        counterState[partition.partition()] = new CounterState<>(new HashMap<>());
+      });
+  }
+
+  @Override
+  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions
+      .stream()
+      .filter(partition -> partition.topic().equals(topic))
+      .forEach(partition ->
+      {
+        assignedPartitions.remove(partition);
+        counterState[partition.partition()] = null;
+      });
   }
 
 
 
 @SpringBootTest(
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "spring.kafka.consumer.auto-offset-reset=earliest",
     "juplo.consumer.topic=" + TOPIC })
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)