Springify: Die `@PreDestroy`-Methode wird nicht benötigt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
10 import org.springframework.stereotype.Component;
11
12 import java.util.List;
13 import java.util.function.Consumer;
14
15
16 @Component
17 @Slf4j
18 @RequiredArgsConstructor
19 public class EndlessConsumer<K, V>
20 {
21   @Autowired
22   private KafkaListenerEndpointRegistry registry;
23   @Value("${consumer.client-id}")
24   String id;
25   @Autowired
26   Consumer<ConsumerRecord<K, V>> handler;
27
28   private long consumed = 0;
29
30   @KafkaListener(
31       id = "${consumer.client-id}",
32       idIsGroup = false,
33       topics = "${consumer.topic}",
34       containerFactory = "batchFactory",
35       autoStartup = "false")
36   public void receive(List<ConsumerRecord<K, V>> records)
37   {
38     // Do something with the data...
39     log.info("{} - Received {} messages", id, records.size());
40     for (ConsumerRecord<K, V> record : records)
41     {
42       log.info(
43           "{} - {}: {}/{} - {}={}",
44           id,
45           record.offset(),
46           record.topic(),
47           record.partition(),
48           record.key(),
49           record.value()
50       );
51
52       handler.accept(record);
53
54       consumed++;
55     }
56   }
57
58
59   public synchronized void start()
60   {
61     if (registry.getListenerContainer(id).isChildRunning())
62       throw new IllegalStateException("Consumer instance " + id + " is already running!");
63
64     log.info("{} - Starting - consumed {} messages before", id, consumed);
65     registry.getListenerContainer(id).start();
66   }
67
68   public synchronized void stop()
69   {
70     if (!registry.getListenerContainer(id).isChildRunning())
71       throw new IllegalStateException("Consumer instance " + id + " is not running!");
72
73     log.info("{} - Stopping", id);
74     registry.getListenerContainer(id).stop();
75     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
76   }
77 }