1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.common.TopicPartition;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Value;
9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
11 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
12 import org.springframework.stereotype.Component;
14 import java.util.Collection;
15 import java.util.HashMap;
17 import java.util.function.Consumer;
22 @RequiredArgsConstructor
23 public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
26 private KafkaListenerEndpointRegistry registry;
27 @Value("${spring.kafka.consumer.client-id}")
30 Consumer<ConsumerRecord<K, V>> handler;
32 private long consumed = 0;
34 private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
35 private final Map<Integer, Long> offsets = new HashMap<>();
39 public void onPartitionsRevokedBeforeCommit(
40 org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
41 Collection<TopicPartition> partitions)
43 partitions.forEach(tp ->
45 Integer partition = tp.partition();
46 Long newOffset = consumer.position(tp);
47 Long oldOffset = offsets.remove(partition);
49 "{} - removing partition: {}, consumed {} records (offset {} -> {})",
52 newOffset - oldOffset,
55 Map<String, Long> removed = seen.remove(partition);
56 for (String key : removed.keySet())
59 "{} - Seen {} messages for partition={}|key={}",
69 public void onPartitionsAssigned(
70 org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
71 Collection<TopicPartition> partitions)
73 partitions.forEach(tp ->
75 Integer partition = tp.partition();
76 Long offset = consumer.position(tp);
77 log.info("{} - adding partition: {}, offset={}", id, partition, offset);
78 offsets.put(partition, offset);
79 seen.put(partition, new HashMap<>());
85 id = "${spring.kafka.consumer.client-id}",
87 topics = "${consumer.topic}",
88 autoStartup = "false")
89 public void receive(ConsumerRecord<K, V> record)
92 "{} - {}: {}/{} - {}={}",
101 handler.accept(record);
107 public synchronized void start()
109 if (registry.getListenerContainer(id).isChildRunning())
110 throw new IllegalStateException("Consumer instance " + id + " is already running!");
112 log.info("{} - Starting - consumed {} messages before", id, consumed);
113 registry.getListenerContainer(id).start();
116 public synchronized void stop()
118 if (!registry.getListenerContainer(id).isChildRunning())
119 throw new IllegalStateException("Consumer instance " + id + " is not running!");
121 log.info("{} - Stopping", id);
122 registry.getListenerContainer(id).stop();
123 log.info("{} - Stopped - consumed {} messages so far", id, consumed);
126 public synchronized boolean isRunning()
128 return registry.getListenerContainer(id).isChildRunning();