Ignoring poision pills with illeagal state-changes
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
20
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
28
29
30 @RequestMapping("/consumer")
31 @ResponseBody
32 @RequiredArgsConstructor
33 @Slf4j
34 public class TransferConsumer implements Runnable
35 {
36   private final String topic;
37   private final KafkaConsumer<String, String> consumer;
38   private final ExecutorService executorService;
39   private final ObjectMapper mapper;
40   private final ConsumerUseCases productionUseCases, restoreUseCases;
41
42   private boolean restoring = true;
43   private boolean running = false;
44   private boolean shutdown = false;
45   private Future<?> future = null;
46
47
48   @Override
49   public void run()
50   {
51     while (running)
52     {
53       try
54       {
55         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
56         if (records.count() == 0)
57           continue;
58
59         log.debug("polled {} records", records.count());
60         records.forEach(record -> handleRecord(record, productionUseCases));
61       }
62       catch (WakeupException e)
63       {
64         log.info("cleanly interrupted while polling");
65       }
66     }
67
68     log.info("polling stopped");
69   }
70
71   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
72   {
73     try
74     {
75       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
76
77       switch (eventType)
78       {
79         case EventType.NEW_TRANSFER:
80
81           NewTransferEvent newTransferEvent =
82               mapper.readValue(record.value(), NewTransferEvent.class);
83           useCases
84               .create(
85                   newTransferEvent.getId(),
86                   newTransferEvent.getPayer(),
87                   newTransferEvent.getPayee(),
88                   newTransferEvent.getAmount());
89           break;
90
91         case EventType.TRANSFER_STATE_CHANGED:
92
93           TransferStateChangedEvent stateChangedEvent =
94               mapper.readValue(record.value(), TransferStateChangedEvent.class);
95           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
96           break;
97       }
98     }
99     catch (JsonProcessingException e)
100     {
101       log.error(
102           "ignoring invalid json in message #{} on {}/{}: {}",
103           record.offset(),
104           record.topic(),
105           record.partition(),
106           record.value());
107     }
108     catch (IllegalArgumentException e)
109     {
110       log.error(
111           "ignoring invalid message #{} on {}/{}: {}, message={}",
112           record.offset(),
113           record.topic(),
114           record.partition(),
115           e.getMessage(),
116           record.value());
117     }
118   }
119
120   @EventListener
121   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
122   {
123     // Needed, because this method is called synchronously during the
124     // initialization pahse of Spring. If the restoring is processed
125     // in the same thread, it would block the completion of the initialization.
126     // Hence, the app would not react to any signal (CTRL-C, for example) except
127     // a KILL until the restoring is finished.
128     future = executorService.submit(() -> restore());
129   }
130
131   private void restore()
132   {
133     log.info("--> starting restore...");
134
135     List<TopicPartition> partitions =
136         consumer
137             .partitionsFor(topic)
138             .stream()
139             .map(info -> new TopicPartition(topic, info.partition()))
140             .collect(Collectors.toList());
141
142     Map<Integer, Long> lastSeen =
143         consumer
144             .endOffsets(partitions)
145             .entrySet()
146             .stream()
147             .collect(Collectors.toMap(
148                 entry -> entry.getKey().partition(),
149                 entry -> entry.getValue() - 1));
150
151     Map<Integer, Long> positions =
152         lastSeen
153             .keySet()
154             .stream()
155             .collect(Collectors.toMap(
156                 partition -> partition,
157                 partition -> 0l));
158
159     log.info("assigning {}}", partitions);
160     consumer.assign(partitions);
161
162     while (
163         restoring &&
164         positions
165             .entrySet()
166             .stream()
167             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
168             .reduce(false, (a, b) -> a || b))
169     {
170       try
171       {
172         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
173         if (records.count() == 0)
174           continue;
175
176         log.debug("polled {} records", records.count());
177         records.forEach(record ->
178         {
179           handleRecord(record, restoreUseCases);
180           positions.put(record.partition(), record.offset());
181         });
182       }
183       catch(WakeupException e)
184       {
185         log.info("--> cleanly interrupted while restoring");
186         return;
187       }
188     }
189
190     log.info("--> restore completed!");
191     restoring = false;
192
193     // We are intentionally _not_ unsubscribing here, since that would
194     // reset the offset to _earliest_, because we disabled offset-commits.
195
196     start();
197   }
198
199   @PostMapping("start")
200   public synchronized String start()
201   {
202     if (restoring)
203     {
204       log.error("cannot start while restoring");
205       return "Denied: Restoring!";
206     }
207
208     String result = "Started";
209
210     if (running)
211     {
212       stop();
213       result = "Restarted";
214     }
215
216     running = true;
217     future = executorService.submit(this);
218
219     log.info("started");
220     return result;
221   }
222
223   @PostMapping("stop")
224   public synchronized String stop()
225   {
226     if (!(running || restoring))
227     {
228       log.info("not running!");
229       return "Not running";
230     }
231
232     running = false;
233
234     if (!future.isDone())
235       consumer.wakeup();
236
237     log.info("waiting for the consumer...");
238     try
239     {
240       future.get();
241     }
242     catch (InterruptedException|ExecutionException e)
243     {
244       log.error("Exception while joining polling task!", e);
245       return e.getMessage();
246     }
247     finally
248     {
249       future = null;
250     }
251
252     log.info("stopped");
253     return "Stopped";
254   }
255
256   public synchronized void shutdown()
257   {
258     log.info("shutdown initiated!");
259     shutdown = true;
260     stop();
261     log.info("closing consumer");
262     consumer.close();
263   }
264
265
266   public interface ConsumerUseCases
267       extends
268         GetTransferUseCase,
269         CreateTransferUseCase,
270         HandleStateChangeUseCase {};
271 }