Springify: Start/Stop prüft, ob der Container schon/noch läuft
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
10 import org.springframework.stereotype.Component;
11
12 import javax.annotation.PreDestroy;
13 import java.util.List;
14 import java.util.function.Consumer;
15
16
17 @Component
18 @Slf4j
19 @RequiredArgsConstructor
20 public class EndlessConsumer<K, V>
21 {
22   @Autowired
23   private KafkaListenerEndpointRegistry registry;
24   @Value("${consumer.client-id}")
25   String id;
26   @Autowired
27   Consumer<ConsumerRecord<K, V>> handler;
28
29   private long consumed = 0;
30
31   @KafkaListener(
32       id = "${consumer.client-id}",
33       idIsGroup = false,
34       topics = "${consumer.topic}",
35       containerFactory = "batchFactory",
36       autoStartup = "false")
37   public void receive(List<ConsumerRecord<K, V>> records)
38   {
39     // Do something with the data...
40     log.info("{} - Received {} messages", id, records.size());
41     for (ConsumerRecord<K, V> record : records)
42     {
43       log.info(
44           "{} - {}: {}/{} - {}={}",
45           id,
46           record.offset(),
47           record.topic(),
48           record.partition(),
49           record.key(),
50           record.value()
51       );
52
53       handler.accept(record);
54
55       consumed++;
56     }
57   }
58
59
60   public synchronized void start()
61   {
62     if (registry.getListenerContainer(id).isChildRunning())
63       throw new IllegalStateException("Consumer instance " + id + " is already running!");
64
65     log.info("{} - Starting - consumed {} messages before", id, consumed);
66     registry.getListenerContainer(id).start();
67   }
68
69   public synchronized void stop()
70   {
71     if (!registry.getListenerContainer(id).isChildRunning())
72       throw new IllegalStateException("Consumer instance " + id + " is not running!");
73
74     log.info("{} - Stopping", id);
75     registry.getListenerContainer(id).stop();
76     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
77   }
78
79   @PreDestroy
80   public void destroy()
81   {
82     log.info("{} - Destroy!", id);
83     try
84     {
85       stop();
86     }
87     catch (IllegalStateException e)
88     {
89       log.info("{} - Was already stopped", id);
90     }
91     catch (Exception e)
92     {
93       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
94     }
95     finally
96     {
97       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
98     }
99   }
100 }