Automatically rebuild the state after a crash / restart
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import lombok.RequiredArgsConstructor;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.errors.WakeupException;
16 import org.springframework.context.event.ContextRefreshedEvent;
17 import org.springframework.context.event.EventListener;
18 import org.springframework.web.bind.annotation.PostMapping;
19 import org.springframework.web.bind.annotation.RequestMapping;
20 import org.springframework.web.bind.annotation.ResponseBody;
21
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Future;
28 import java.util.stream.Collectors;
29
30 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
31
32
33 @RequestMapping("/consumer")
34 @ResponseBody
35 @RequiredArgsConstructor
36 @Slf4j
37 public class TransferConsumer implements Runnable
38 {
39   private final String topic;
40   private final KafkaConsumer<String, String> consumer;
41   private final ExecutorService executorService;
42   private final ObjectMapper mapper;
43   private final ConsumerUseCases productionUseCases, restoreUseCases;
44
45   private boolean restoring = true;
46   private boolean running = false;
47   private boolean shutdown = false;
48   private Future<?> future = null;
49
50
51   @Override
52   public void run()
53   {
54     while (running)
55     {
56       try
57       {
58         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
59         if (records.count() == 0)
60           continue;
61
62         log.debug("polled {} records", records.count());
63         records.forEach(record -> handleRecord(record, productionUseCases));
64       }
65       catch (WakeupException e)
66       {
67         log.info("cleanly interrupted while polling");
68       }
69     }
70
71     log.info("polling stopped");
72   }
73
74   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
75   {
76     try
77     {
78       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
79
80       switch (eventType)
81       {
82         case EventType.NEW_TRANSFER:
83
84           NewTransferEvent newTransferEvent =
85               mapper.readValue(record.value(), NewTransferEvent.class);
86           useCases.create(newTransferEvent.toTransfer().setState(CREATED));
87           break;
88
89         case EventType.TRANSFER_STATE_CHANGED:
90
91           TransferStateChangedEvent stateChangedEvent =
92               mapper.readValue(record.value(), TransferStateChangedEvent.class);
93           useCases
94               .get(stateChangedEvent.getId())
95               .ifPresentOrElse(
96                   transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
97                   () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
98           break;
99       }
100     }
101     catch (JsonProcessingException e)
102     {
103       log.error(
104           "ignoring invalid json in message #{} on {}/{}: {}",
105           record.offset(),
106           record.topic(),
107           record.partition(),
108           record.value());
109     }
110   }
111
112   @EventListener
113   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
114   {
115     // Needed, because this method is called synchronously during the
116     // initialization pahse of Spring. If the restoring is processed
117     // in the same thread, it would block the completion of the initialization.
118     // Hence, the app would not react to any signal (CTRL-C, for example) except
119     // a KILL until the restoring is finished.
120     future = executorService.submit(() -> restore());
121   }
122
123   private void restore()
124   {
125     log.info("--> starting restore...");
126
127     List<TopicPartition> partitions =
128         consumer
129             .partitionsFor(topic)
130             .stream()
131             .map(info -> new TopicPartition(topic, info.partition()))
132             .collect(Collectors.toList());
133
134     Map<Integer, Long> lastSeen =
135         consumer
136             .endOffsets(partitions)
137             .entrySet()
138             .stream()
139             .collect(Collectors.toMap(
140                 entry -> entry.getKey().partition(),
141                 entry -> entry.getValue() - 1));
142
143     Map<Integer, Long> positions =
144         lastSeen
145             .keySet()
146             .stream()
147             .collect(Collectors.toMap(
148                 partition -> partition,
149                 partition -> 0l));
150
151     log.info("assigning {}}", partitions);
152     consumer.assign(partitions);
153
154     while (
155         restoring &&
156         positions
157             .entrySet()
158             .stream()
159             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
160             .reduce(false, (a, b) -> a || b))
161     {
162       try
163       {
164         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
165         if (records.count() == 0)
166           continue;
167
168         log.debug("polled {} records", records.count());
169         records.forEach(record ->
170         {
171           handleRecord(record, restoreUseCases);
172           positions.put(record.partition(), record.offset());
173         });
174       }
175       catch(WakeupException e)
176       {
177         log.info("--> cleanly interrupted while restoring");
178         return;
179       }
180     }
181
182     log.info("--> restore completed!");
183     restoring = false;
184
185     // We are intentionally _not_ unsubscribing here, since that would
186     // reset the offset to _earliest_, because we disabled offset-commits.
187
188     start();
189   }
190
191   @PostMapping("start")
192   public synchronized String start()
193   {
194     if (restoring)
195     {
196       log.error("cannot start while restoring");
197       return "Denied: Restoring!";
198     }
199
200     String result = "Started";
201
202     if (running)
203     {
204       stop();
205       result = "Restarted";
206     }
207
208     running = true;
209     future = executorService.submit(this);
210
211     log.info("started");
212     return result;
213   }
214
215   @PostMapping("stop")
216   public synchronized String stop()
217   {
218     if (!(running || restoring))
219     {
220       log.info("not running!");
221       return "Not running";
222     }
223
224     running = false;
225
226     if (!future.isDone())
227       consumer.wakeup();
228
229     log.info("waiting for the consumer...");
230     try
231     {
232       future.get();
233     }
234     catch (InterruptedException|ExecutionException e)
235     {
236       log.error("Exception while joining polling task!", e);
237       return e.getMessage();
238     }
239     finally
240     {
241       future = null;
242     }
243
244     log.info("stopped");
245     return "Stopped";
246   }
247
248   public synchronized void shutdown()
249   {
250     log.info("shutdown initiated!");
251     shutdown = true;
252     stop();
253     log.info("closing consumer");
254     consumer.close();
255   }
256
257
258   public interface ConsumerUseCases
259       extends
260         GetTransferUseCase,
261         CreateTransferUseCase,
262         HandleStateChangeUseCase {};
263 }