Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaHandler;
7 import org.springframework.kafka.annotation.KafkaListener;
8 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
9 import org.springframework.kafka.support.KafkaHeaders;
10 import org.springframework.messaging.handler.annotation.Header;
11 import org.springframework.messaging.handler.annotation.Payload;
12
13 import java.util.List;
14 import java.util.Optional;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 @KafkaListener(
20     id = "${spring.kafka.client-id}",
21     idIsGroup = false,
22     topics = "${sumup.adder.topic}",
23     autoStartup = "false")
24 public class EndlessConsumer
25 {
26   private final String id;
27   private final KafkaListenerEndpointRegistry registry;
28   private final RecordHandler recordHandler;
29
30   private long consumed = 0;
31
32
33   @KafkaHandler
34   public void addNumber(
35     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
36     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
37     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
38     @Header(KafkaHeaders.OFFSET) Long offset,
39     @Payload MessageAddNumber message)
40   {
41           log.info(
42               "{} - {}: {}/{} - {}={}",
43               id,
44               offset,
45               topic,
46               partition,
47               key,
48               message
49           );
50
51           recordHandler.addNumber(topic, partition, offset, key, message);
52
53           consumed++;
54   }
55
56   @KafkaHandler
57   public void calculateSum(
58     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
59     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
60     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
61     @Header(KafkaHeaders.OFFSET) Long offset,
62     @Payload MessageCalculateSum message)
63   {
64           log.info(
65               "{} - {}: {}/{} - {}={}",
66               id,
67               offset,
68               topic,
69               partition,
70               key,
71               message
72           );
73
74           recordHandler.calculateSum(topic, partition, offset, key, message);
75
76           consumed++;
77   }
78
79   public void start()
80   {
81     if (running())
82       throw new IllegalStateException("Consumer instance " + id + " is already running!");
83
84     log.info("{} - Starting - consumed {} messages before", id, consumed);
85     registry.getListenerContainer(id).start();
86   }
87
88   public void stop()
89   {
90     if (!running())
91       throw new IllegalStateException("Consumer instance " + id + " is not running!");
92
93     log.info("{} - Stopping", id);
94     registry.getListenerContainer(id).stop();
95     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
96   }
97
98   public boolean running()
99   {
100     return registry.getListenerContainer(id).isRunning();
101   }
102 }