ea899cc41fec5afbe2f053b350d873044e0d9c49
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
10 import org.springframework.stereotype.Component;
11
12 import javax.annotation.PreDestroy;
13 import java.util.List;
14 import java.util.function.Consumer;
15
16
17 @Component
18 @Slf4j
19 @RequiredArgsConstructor
20 public class EndlessConsumer<K, V>
21 {
22   @Autowired
23   private KafkaListenerEndpointRegistry registry;
24   @Value("${consumer.client-id}")
25   String id;
26   @Autowired
27   Consumer<ConsumerRecord<K, V>> handler;
28
29   private long consumed = 0;
30
31   @KafkaListener(
32       id = "${consumer.client-id}",
33       idIsGroup = false,
34       topics = "${consumer.topic}",
35       containerFactory = "batchFactory",
36       autoStartup = "false")
37   public void receive(List<ConsumerRecord<K, V>> records)
38   {
39     // Do something with the data...
40     log.info("{} - Received {} messages", id, records.size());
41     for (ConsumerRecord<K, V> record : records)
42     {
43       log.info(
44           "{} - {}: {}/{} - {}={}",
45           id,
46           record.offset(),
47           record.topic(),
48           record.partition(),
49           record.key(),
50           record.value()
51       );
52
53       handler.accept(record);
54
55       consumed++;
56     }
57   }
58
59
60   public synchronized void start()
61   {
62     log.info("{} - Starting - consumed {} messages before", id, consumed);
63     registry.getListenerContainer(id).start();
64   }
65
66   public synchronized void stop()
67   {
68     log.info("{} - Stopping", id);
69     registry.getListenerContainer(id).stop();
70     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
71   }
72
73   @PreDestroy
74   public void destroy()
75   {
76     log.info("{} - Destroy!", id);
77     stop();
78   }
79 }