#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
+docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2
if [[
$(docker image ls -q $IMAGE) == "" ||
docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
+docker compose -f docker/docker-compose.yml up -d consumer-1
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
+docker compose -f docker/docker-compose.yml up -d consumer-2
+sleep 10
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
+docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
+docker compose -f docker/docker-compose.yml stop producer consumer-1
juplo.client-id: producer
juplo.producer.topic: test
juplo.producer.linger-ms: 666
- juplo.producer.throttle-ms: 100
+ juplo.producer.throttle-ms: 10
consumer:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: consumer
juplo.consumer.topic: test
+ juplo.producer.linger-ms: 1000
+ logging.level.de.juplo: TRACE
peter:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: peter
juplo.consumer.topic: test
+ juplo.producer.linger-ms: 1000
+ logging.level.de.juplo: TRACE
ute:
- image: juplo/spring-consumer:1.1-SNAPSHOT
+ image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: ute
juplo.consumer.topic: test
+ juplo.producer.linger-ms: 1000
+ logging.level.de.juplo: TRACE
volumes:
zookeeper-data:
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-SNAPSHOT</version>
+ <version>1.1-rebalance-listener-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+public class CounterState<K>
+{
+ private final Map<K, Long> counterState;
+
+
+ public synchronized Long addToCounter(K key)
+ {
+ return counterState.compute(key, (k, v) -> v == null ? 1l : v + 1);
+ }
+
+ public synchronized Map<K, Long> getCounterState()
+ {
+ return new HashMap<>(counterState);
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+
+@RestController
+@RequiredArgsConstructor
+public class CounterStateController<K>
+{
+ private final ExampleConsumer<K, ?> consumer;
+
+ @GetMapping
+ Map<Integer, Map<K, Long>> getAllCounters()
+ {
+ return consumer.getCounterState();
+ }
+}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
-import java.util.Arrays;
+import java.util.*;
@Slf4j
-public class ExampleConsumer<K, V> implements Runnable
+public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListener
{
private final String id;
private final String topic;
private final Runnable closeCallback;
private volatile boolean running = false;
+ private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+ private CounterState<K>[] counterState;
private long consumed = 0;
{
try
{
+ log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+ int numPartitions = consumer.partitionsFor(topic).size();
+ log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+ counterState = new CounterState[numPartitions];
+
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ consumer.subscribe(Arrays.asList(topic), this);
running = true;
while (running)
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+
+ Long counter = computeCount(partition, key);
+ log.info("{} - current value for counter {}: {}", id, key, counter);
+ }
+
+ private synchronized Long computeCount(int partition, K key)
+ {
+ return counterState[partition].addToCounter(key);
+ }
+
+ public Map<Integer, Map<K, Long>> getCounterState()
+ {
+ Map<Integer, Map<K, Long>> result = new HashMap<>(assignedPartitions.size());
+ assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+ return result;
+ }
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .filter(partition -> partition.topic().equals(topic))
+ .forEach(partition ->
+ {
+ assignedPartitions.add(partition);
+ counterState[partition.partition()] = new CounterState<>(new HashMap<>());
+ });
+ }
+
+ @Override
+ public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions
+ .stream()
+ .filter(partition -> partition.topic().equals(topic))
+ .forEach(partition ->
+ {
+ assignedPartitions.remove(partition);
+ counterState[partition.partition()] = null;
+ });
}
@SpringBootTest(
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
"juplo.consumer.topic=" + TOPIC })
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)