63fbef5f523bb8a7d9cc1f838e4d51726ace008d
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
20
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
28
29
30 @RequestMapping("/consumer")
31 @ResponseBody
32 @RequiredArgsConstructor
33 @Slf4j
34 public class TransferConsumer implements Runnable
35 {
36   private final String topic;
37   private final KafkaConsumer<String, String> consumer;
38   private final ExecutorService executorService;
39   private final ObjectMapper mapper;
40   private final ConsumerUseCases productionUseCases, restoreUseCases;
41
42   private boolean restoring = true;
43   private boolean running = false;
44   private boolean shutdown = false;
45   private Future<?> future = null;
46
47
48   @Override
49   public void run()
50   {
51     while (running)
52     {
53       try
54       {
55         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
56         if (records.count() == 0)
57           continue;
58
59         log.debug("polled {} records", records.count());
60         records.forEach(record -> handleRecord(record, productionUseCases));
61       }
62       catch (WakeupException e)
63       {
64         log.info("cleanly interrupted while polling");
65       }
66     }
67
68     log.info("polling stopped");
69   }
70
71   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
72   {
73     try
74     {
75       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
76
77       switch (eventType)
78       {
79         case EventType.NEW_TRANSFER:
80
81           NewTransferEvent newTransferEvent =
82               mapper.readValue(record.value(), NewTransferEvent.class);
83           useCases
84               .create(
85                   newTransferEvent.getId(),
86                   newTransferEvent.getPayer(),
87                   newTransferEvent.getPayee(),
88                   newTransferEvent.getAmount());
89           break;
90
91         case EventType.TRANSFER_STATE_CHANGED:
92
93           TransferStateChangedEvent stateChangedEvent =
94               mapper.readValue(record.value(), TransferStateChangedEvent.class);
95           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
96           break;
97       }
98     }
99     catch (JsonProcessingException e)
100     {
101       log.error(
102           "ignoring invalid json in message #{} on {}/{}: {}",
103           record.offset(),
104           record.topic(),
105           record.partition(),
106           record.value());
107     }
108   }
109
110   @EventListener
111   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
112   {
113     // Needed, because this method is called synchronously during the
114     // initialization pahse of Spring. If the restoring is processed
115     // in the same thread, it would block the completion of the initialization.
116     // Hence, the app would not react to any signal (CTRL-C, for example) except
117     // a KILL until the restoring is finished.
118     future = executorService.submit(() -> restore());
119   }
120
121   private void restore()
122   {
123     log.info("--> starting restore...");
124
125     List<TopicPartition> partitions =
126         consumer
127             .partitionsFor(topic)
128             .stream()
129             .map(info -> new TopicPartition(topic, info.partition()))
130             .collect(Collectors.toList());
131
132     Map<Integer, Long> lastSeen =
133         consumer
134             .endOffsets(partitions)
135             .entrySet()
136             .stream()
137             .collect(Collectors.toMap(
138                 entry -> entry.getKey().partition(),
139                 entry -> entry.getValue() - 1));
140
141     Map<Integer, Long> positions =
142         lastSeen
143             .keySet()
144             .stream()
145             .collect(Collectors.toMap(
146                 partition -> partition,
147                 partition -> 0l));
148
149     log.info("assigning {}}", partitions);
150     consumer.assign(partitions);
151
152     while (
153         restoring &&
154         positions
155             .entrySet()
156             .stream()
157             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
158             .reduce(false, (a, b) -> a || b))
159     {
160       try
161       {
162         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
163         if (records.count() == 0)
164           continue;
165
166         log.debug("polled {} records", records.count());
167         records.forEach(record ->
168         {
169           handleRecord(record, restoreUseCases);
170           positions.put(record.partition(), record.offset());
171         });
172       }
173       catch(WakeupException e)
174       {
175         log.info("--> cleanly interrupted while restoring");
176         return;
177       }
178     }
179
180     log.info("--> restore completed!");
181     restoring = false;
182
183     // We are intentionally _not_ unsubscribing here, since that would
184     // reset the offset to _earliest_, because we disabled offset-commits.
185
186     start();
187   }
188
189   @PostMapping("start")
190   public synchronized String start()
191   {
192     if (restoring)
193     {
194       log.error("cannot start while restoring");
195       return "Denied: Restoring!";
196     }
197
198     String result = "Started";
199
200     if (running)
201     {
202       stop();
203       result = "Restarted";
204     }
205
206     running = true;
207     future = executorService.submit(this);
208
209     log.info("started");
210     return result;
211   }
212
213   @PostMapping("stop")
214   public synchronized String stop()
215   {
216     if (!(running || restoring))
217     {
218       log.info("not running!");
219       return "Not running";
220     }
221
222     running = false;
223
224     if (!future.isDone())
225       consumer.wakeup();
226
227     log.info("waiting for the consumer...");
228     try
229     {
230       future.get();
231     }
232     catch (InterruptedException|ExecutionException e)
233     {
234       log.error("Exception while joining polling task!", e);
235       return e.getMessage();
236     }
237     finally
238     {
239       future = null;
240     }
241
242     log.info("stopped");
243     return "Stopped";
244   }
245
246   public synchronized void shutdown()
247   {
248     log.info("shutdown initiated!");
249     shutdown = true;
250     stop();
251     log.info("closing consumer");
252     consumer.close();
253   }
254
255
256   public interface ConsumerUseCases
257       extends
258         GetTransferUseCase,
259         CreateTransferUseCase,
260         HandleStateChangeUseCase {};
261 }