1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.common.TopicPartition;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Value;
9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
11 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
12 import org.springframework.stereotype.Component;
14 import java.util.Collection;
15 import java.util.HashMap;
17 import java.util.Optional;
18 import java.util.function.Consumer;
23 @RequiredArgsConstructor
24 public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
27 private KafkaListenerEndpointRegistry registry;
28 @Value("${spring.kafka.consumer.client-id}")
31 Consumer<ConsumerRecord<K, V>> handler;
33 ApplicationErrorHandler errorHandler;
35 private long consumed = 0;
37 private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
38 private final Map<Integer, Long> offsets = new HashMap<>();
42 public void onPartitionsRevokedBeforeCommit(
43 org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
44 Collection<TopicPartition> partitions)
46 partitions.forEach(tp ->
48 Integer partition = tp.partition();
49 Long newOffset = consumer.position(tp);
50 Long oldOffset = offsets.remove(partition);
52 "{} - removing partition: {}, consumed {} records (offset {} -> {})",
55 newOffset - oldOffset,
58 Map<String, Long> removed = seen.remove(partition);
59 for (String key : removed.keySet())
62 "{} - Seen {} messages for partition={}|key={}",
72 public void onPartitionsAssigned(
73 org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
74 Collection<TopicPartition> partitions)
76 partitions.forEach(tp ->
78 Integer partition = tp.partition();
79 Long offset = consumer.position(tp);
80 log.info("{} - adding partition: {}, offset={}", id, partition, offset);
81 offsets.put(partition, offset);
82 seen.put(partition, new HashMap<>());
88 id = "${spring.kafka.consumer.client-id}",
90 topics = "${consumer.topic}",
91 autoStartup = "false")
92 public void receive(ConsumerRecord<K, V> record)
95 "{} - {}: {}/{} - {}={}",
104 handler.accept(record);
110 public synchronized void start()
112 if (registry.getListenerContainer(id).isChildRunning())
113 throw new IllegalStateException("Consumer instance " + id + " is already running!");
115 log.info("{} - Starting - consumed {} messages before", id, consumed);
116 errorHandler.clearException();
117 registry.getListenerContainer(id).start();
120 public synchronized void stop()
122 if (!registry.getListenerContainer(id).isChildRunning())
123 throw new IllegalStateException("Consumer instance " + id + " is not running!");
125 log.info("{} - Stopping", id);
126 registry.getListenerContainer(id).stop();
127 log.info("{} - Stopped - consumed {} messages so far", id, consumed);
130 public synchronized Optional<Exception> exitStatus()
132 if (registry.getListenerContainer(id).isChildRunning())
133 throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
135 return errorHandler.getException();