Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.common.TopicPartition;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.beans.factory.annotation.Value;
9 import org.springframework.kafka.annotation.KafkaListener;
10 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
11 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
12 import org.springframework.stereotype.Component;
13
14 import java.util.Collection;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.function.Consumer;
18
19
20 @Component
21 @Slf4j
22 @RequiredArgsConstructor
23 public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
24 {
25   @Autowired
26   private KafkaListenerEndpointRegistry registry;
27   @Value("${spring.kafka.consumer.client-id}")
28   String id;
29   @Autowired
30   Consumer<ConsumerRecord<K, V>> handler;
31
32   private long consumed = 0;
33
34   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
35   private final Map<Integer, Long> offsets = new HashMap<>();
36
37
38   @Override
39   public void onPartitionsRevokedBeforeCommit(
40       org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
41       Collection<TopicPartition> partitions)
42   {
43     partitions.forEach(tp ->
44     {
45       Integer partition = tp.partition();
46       Long newOffset = consumer.position(tp);
47       Long oldOffset = offsets.remove(partition);
48       log.info(
49           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
50           id,
51           partition,
52           newOffset - oldOffset,
53           oldOffset,
54           newOffset);
55       Map<String, Long> removed = seen.remove(partition);
56       for (String key : removed.keySet())
57       {
58         log.info(
59             "{} - Seen {} messages for partition={}|key={}",
60             id,
61             removed.get(key),
62             partition,
63             key);
64       }
65     });
66   }
67
68   @Override
69   public void onPartitionsAssigned(
70       org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
71       Collection<TopicPartition> partitions)
72   {
73     partitions.forEach(tp ->
74     {
75       Integer partition = tp.partition();
76       Long offset = consumer.position(tp);
77       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
78       offsets.put(partition, offset);
79       seen.put(partition, new HashMap<>());
80     });
81   }
82
83
84   @KafkaListener(
85       id = "${spring.kafka.consumer.client-id}",
86       idIsGroup = false,
87       topics = "${consumer.topic}",
88       autoStartup = "false")
89   public void receive(ConsumerRecord<K, V> record)
90   {
91     log.info(
92         "{} - {}: {}/{} - {}={}",
93         id,
94         record.offset(),
95         record.topic(),
96         record.partition(),
97         record.key(),
98         record.value()
99     );
100
101     handler.accept(record);
102
103     consumed++;
104   }
105
106
107   public synchronized void start()
108   {
109     if (registry.getListenerContainer(id).isChildRunning())
110       throw new IllegalStateException("Consumer instance " + id + " is already running!");
111
112     log.info("{} - Starting - consumed {} messages before", id, consumed);
113     registry.getListenerContainer(id).start();
114   }
115
116   public synchronized void stop()
117   {
118     if (!registry.getListenerContainer(id).isChildRunning())
119       throw new IllegalStateException("Consumer instance " + id + " is not running!");
120
121     log.info("{} - Stopping", id);
122     registry.getListenerContainer(id).stop();
123     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
124   }
125
126   public synchronized boolean isRunning()
127   {
128     return registry.getListenerContainer(id).isChildRunning();
129   }
130 }