Simplified the thread-execution
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
20
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
28
29
30 @RequestMapping("/consumer")
31 @ResponseBody
32 @RequiredArgsConstructor
33 @Slf4j
34 public class TransferConsumer implements Runnable
35 {
36   private final String topic;
37   private final KafkaConsumer<String, String> consumer;
38   private final ObjectMapper mapper;
39   private final ConsumerUseCases productionUseCases, restoreUseCases;
40
41   private boolean restoring = true;
42   private boolean running = false;
43   private boolean shutdown = false;
44   private Future<?> future = null;
45
46
47   @Override
48   public void run()
49   {
50     while (running)
51     {
52       try
53       {
54         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
55         if (records.count() == 0)
56           continue;
57
58         log.debug("polled {} records", records.count());
59         records.forEach(record -> handleRecord(record, productionUseCases));
60       }
61       catch (WakeupException e)
62       {
63         log.info("cleanly interrupted while polling");
64       }
65     }
66
67     log.info("polling stopped");
68   }
69
70   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
71   {
72     try
73     {
74       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
75
76       switch (eventType)
77       {
78         case EventType.NEW_TRANSFER:
79
80           NewTransferEvent newTransferEvent =
81               mapper.readValue(record.value(), NewTransferEvent.class);
82           useCases
83               .create(
84                   newTransferEvent.getId(),
85                   newTransferEvent.getPayer(),
86                   newTransferEvent.getPayee(),
87                   newTransferEvent.getAmount());
88           break;
89
90         case EventType.TRANSFER_STATE_CHANGED:
91
92           TransferStateChangedEvent stateChangedEvent =
93               mapper.readValue(record.value(), TransferStateChangedEvent.class);
94           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
95           break;
96       }
97     }
98     catch (JsonProcessingException e)
99     {
100       log.error(
101           "ignoring invalid json in message #{} on {}/{}: {}",
102           record.offset(),
103           record.topic(),
104           record.partition(),
105           record.value());
106     }
107     catch (IllegalArgumentException e)
108     {
109       log.error(
110           "ignoring invalid message #{} on {}/{}: {}, message={}",
111           record.offset(),
112           record.topic(),
113           record.partition(),
114           e.getMessage(),
115           record.value());
116     }
117   }
118
119   @EventListener
120   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
121   {
122     // Needed, because this method is called synchronously during the
123     // initialization pahse of Spring. If the restoring is processed
124     // in the same thread, it would block the completion of the initialization.
125     // Hence, the app would not react to any signal (CTRL-C, for example) except
126     // a KILL until the restoring is finished.
127     future = CompletableFuture.runAsync(() -> restore());
128   }
129
130   private void restore()
131   {
132     log.info("--> starting restore...");
133
134     List<TopicPartition> partitions =
135         consumer
136             .partitionsFor(topic)
137             .stream()
138             .map(info -> new TopicPartition(topic, info.partition()))
139             .collect(Collectors.toList());
140
141     Map<Integer, Long> lastSeen =
142         consumer
143             .endOffsets(partitions)
144             .entrySet()
145             .stream()
146             .collect(Collectors.toMap(
147                 entry -> entry.getKey().partition(),
148                 entry -> entry.getValue() - 1));
149
150     Map<Integer, Long> positions =
151         lastSeen
152             .keySet()
153             .stream()
154             .collect(Collectors.toMap(
155                 partition -> partition,
156                 partition -> 0l));
157
158     log.info("assigning {}}", partitions);
159     consumer.assign(partitions);
160
161     while (
162         restoring &&
163         positions
164             .entrySet()
165             .stream()
166             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
167             .reduce(false, (a, b) -> a || b))
168     {
169       try
170       {
171         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
172         if (records.count() == 0)
173           continue;
174
175         log.debug("polled {} records", records.count());
176         records.forEach(record ->
177         {
178           handleRecord(record, restoreUseCases);
179           positions.put(record.partition(), record.offset());
180         });
181       }
182       catch(WakeupException e)
183       {
184         log.info("--> cleanly interrupted while restoring");
185         return;
186       }
187     }
188
189     log.info("--> restore completed!");
190     restoring = false;
191
192     // We are intentionally _not_ unsubscribing here, since that would
193     // reset the offset to _earliest_, because we disabled offset-commits.
194
195     start();
196   }
197
198   @PostMapping("start")
199   public synchronized String start()
200   {
201     if (restoring)
202     {
203       log.error("cannot start while restoring");
204       return "Denied: Restoring!";
205     }
206
207     String result = "Started";
208
209     if (running)
210     {
211       stop();
212       result = "Restarted";
213     }
214
215     running = true;
216     future = CompletableFuture.runAsync(this);
217
218     log.info("started");
219     return result;
220   }
221
222   @PostMapping("stop")
223   public synchronized String stop()
224   {
225     if (!(running || restoring))
226     {
227       log.info("not running!");
228       return "Not running";
229     }
230
231     running = false;
232
233     if (!future.isDone())
234       consumer.wakeup();
235
236     log.info("waiting for the consumer...");
237     try
238     {
239       future.get();
240     }
241     catch (InterruptedException|ExecutionException e)
242     {
243       log.error("Exception while joining polling task!", e);
244       return e.getMessage();
245     }
246     finally
247     {
248       future = null;
249     }
250
251     log.info("stopped");
252     return "Stopped";
253   }
254
255   public synchronized void shutdown()
256   {
257     log.info("shutdown initiated!");
258     shutdown = true;
259     stop();
260     log.info("closing consumer");
261     consumer.close();
262   }
263
264
265   public interface ConsumerUseCases
266       extends
267         GetTransferUseCase,
268         CreateTransferUseCase,
269         HandleStateChangeUseCase {};
270 }