01397a208e4ee4ef6430d1d46b5f68b2e73edece
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
8
9 import java.util.List;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class EndlessConsumer<K, V>
16 {
17   private final String id;
18   private final KafkaListenerEndpointRegistry registry;
19   private final ApplicationErrorHandler errorHandler;
20   private final RecordHandler<K, V> recordHandler;
21
22   private long consumed = 0;
23
24
25   @KafkaListener(
26       id = "${spring.kafka.client-id}",
27       idIsGroup = false,
28       topics = "${sumup.adder.topic}",
29       autoStartup = "false")
30   public void accept(ConsumerRecord<K, V> record)
31   {
32           log.info(
33               "{} - {}: {}/{} - {}={}",
34               id,
35               record.offset(),
36               record.topic(),
37               record.partition(),
38               record.key(),
39               record.value()
40           );
41
42           recordHandler.accept(record);
43
44           consumed++;
45   }
46
47   public void start()
48   {
49     if (running())
50       throw new IllegalStateException("Consumer instance " + id + " is already running!");
51
52     log.info("{} - Starting - consumed {} messages before", id, consumed);
53     errorHandler.clearState();
54     registry.getListenerContainer(id).start();
55   }
56
57   public void stop()
58   {
59     if (!running())
60       throw new IllegalStateException("Consumer instance " + id + " is not running!");
61
62     log.info("{} - Stopping", id);
63     registry.getListenerContainer(id).stop();
64     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
65   }
66
67   public boolean running()
68   {
69     return registry.getListenerContainer(id).isRunning();
70   }
71
72   public Optional<Exception> exitStatus()
73   {
74     if (running())
75       throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
76
77     return errorHandler.getException();
78   }
79 }