Auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
8
9 import java.util.List;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class EndlessConsumer<K, V>
16 {
17   private final String id;
18   private final KafkaListenerEndpointRegistry registry;
19   private final ApplicationErrorHandler errorHandler;
20   private final RecordHandler<K, V> recordHandler;
21
22   private long consumed = 0;
23
24
25   @KafkaListener(
26       id = "${spring.kafka.client-id}",
27       idIsGroup = false,
28       topics = "${sumup.adder.topic}",
29       batch = "true",
30       autoStartup = "false")
31   public void accept(List<ConsumerRecord<K, V>> records)
32   {
33         // Do something with the data...
34         log.info("{} - Received {} messages", id, records.size());
35         for (ConsumerRecord<K, V> record : records)
36         {
37           log.info(
38               "{} - {}: {}/{} - {}={}",
39               id,
40               record.offset(),
41               record.topic(),
42               record.partition(),
43               record.key(),
44               record.value()
45           );
46
47           recordHandler.accept(record);
48
49           consumed++;
50         }
51   }
52
53   public void start()
54   {
55     if (running())
56       throw new IllegalStateException("Consumer instance " + id + " is already running!");
57
58     log.info("{} - Starting - consumed {} messages before", id, consumed);
59     errorHandler.clearState();
60     registry.getListenerContainer(id).start();
61   }
62
63   public void stop()
64   {
65     if (!running())
66       throw new IllegalStateException("Consumer instance " + id + " is not running!");
67
68     log.info("{} - Stopping", id);
69     registry.getListenerContainer(id).stop();
70     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
71   }
72
73   public boolean running()
74   {
75     return registry.getListenerContainer(id).isRunning();
76   }
77
78   public Optional<Exception> exitStatus()
79   {
80     if (running())
81       throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
82
83     return errorHandler.getException();
84   }
85 }