`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaHandler;
7 import org.springframework.kafka.annotation.KafkaListener;
8 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
9 import org.springframework.kafka.support.KafkaHeaders;
10 import org.springframework.messaging.handler.annotation.Header;
11 import org.springframework.messaging.handler.annotation.Payload;
12
13 import java.util.List;
14 import java.util.Optional;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 @KafkaListener(
20     id = "${spring.kafka.client-id}",
21     idIsGroup = false,
22     topics = "${sumup.adder.topic}",
23     autoStartup = "false")
24 public class EndlessConsumer
25 {
26   private final String id;
27   private final KafkaListenerEndpointRegistry registry;
28   private final ApplicationErrorHandler errorHandler;
29   private final RecordHandler recordHandler;
30
31   private long consumed = 0;
32
33
34   @KafkaHandler
35   public void addNumber(
36     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
37     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
38     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
39     @Header(KafkaHeaders.OFFSET) Long offset,
40     @Payload MessageAddNumber message)
41   {
42           log.info(
43               "{} - {}: {}/{} - {}={}",
44               id,
45               offset,
46               topic,
47               partition,
48               key,
49               message
50           );
51
52           recordHandler.addNumber(topic, partition, offset, key, message);
53
54           consumed++;
55   }
56
57   @KafkaHandler
58   public void calculateSum(
59     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
60     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
61     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
62     @Header(KafkaHeaders.OFFSET) Long offset,
63     @Payload MessageCalculateSum message)
64   {
65           log.info(
66               "{} - {}: {}/{} - {}={}",
67               id,
68               offset,
69               topic,
70               partition,
71               key,
72               message
73           );
74
75           recordHandler.calculateSum(topic, partition, offset, key, message);
76
77           consumed++;
78   }
79
80   public void start()
81   {
82     if (running())
83       throw new IllegalStateException("Consumer instance " + id + " is already running!");
84
85     log.info("{} - Starting - consumed {} messages before", id, consumed);
86     errorHandler.clearState();
87     registry.getListenerContainer(id).start();
88   }
89
90   public void stop()
91   {
92     if (!running())
93       throw new IllegalStateException("Consumer instance " + id + " is not running!");
94
95     log.info("{} - Stopping", id);
96     registry.getListenerContainer(id).stop();
97     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
98   }
99
100   public boolean running()
101   {
102     return registry.getListenerContainer(id).isRunning();
103   }
104
105   public Optional<Exception> exitStatus()
106   {
107     if (running())
108       throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
109
110     return errorHandler.getException();
111   }
112 }