1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
21 import java.time.Duration;
22 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
30 @RequestMapping("/consumer")
32 @RequiredArgsConstructor
34 public class TransferConsumer implements Runnable
36 private final String topic;
37 private final KafkaConsumer<String, String> consumer;
38 private final ObjectMapper mapper;
39 private final ConsumerUseCases productionUseCases, restoreUseCases;
41 private boolean restoring = true;
42 private boolean running = false;
43 private boolean shutdown = false;
44 private Future<?> future = null;
54 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
55 if (records.count() == 0)
58 log.debug("polled {} records", records.count());
59 records.forEach(record -> handleRecord(record, productionUseCases));
61 catch (WakeupException e)
63 log.info("cleanly interrupted while polling");
67 log.info("polling stopped");
70 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
74 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
78 case EventType.NEW_TRANSFER:
80 NewTransferEvent newTransferEvent =
81 mapper.readValue(record.value(), NewTransferEvent.class);
84 newTransferEvent.getId(),
85 newTransferEvent.getPayer(),
86 newTransferEvent.getPayee(),
87 newTransferEvent.getAmount());
90 case EventType.TRANSFER_STATE_CHANGED:
92 TransferStateChangedEvent stateChangedEvent =
93 mapper.readValue(record.value(), TransferStateChangedEvent.class);
94 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
98 catch (JsonProcessingException e)
101 "ignoring invalid json in message #{} on {}/{}: {}",
107 catch (IllegalArgumentException e)
110 "ignoring invalid message #{} on {}/{}: {}, message={}",
120 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
122 // Needed, because this method is called synchronously during the
123 // initialization pahse of Spring. If the restoring is processed
124 // in the same thread, it would block the completion of the initialization.
125 // Hence, the app would not react to any signal (CTRL-C, for example) except
126 // a KILL until the restoring is finished.
127 future = CompletableFuture.runAsync(() -> restore());
130 private void restore()
132 log.info("--> starting restore...");
134 List<TopicPartition> partitions =
136 .partitionsFor(topic)
138 .map(info -> new TopicPartition(topic, info.partition()))
139 .collect(Collectors.toList());
141 Map<Integer, Long> lastSeen =
143 .endOffsets(partitions)
146 .collect(Collectors.toMap(
147 entry -> entry.getKey().partition(),
148 entry -> entry.getValue() - 1));
150 Map<Integer, Long> positions =
154 .collect(Collectors.toMap(
155 partition -> partition,
158 log.info("assigning {}}", partitions);
159 consumer.assign(partitions);
166 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
167 .reduce(false, (a, b) -> a || b))
171 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
172 if (records.count() == 0)
175 log.debug("polled {} records", records.count());
176 records.forEach(record ->
178 handleRecord(record, restoreUseCases);
179 positions.put(record.partition(), record.offset());
182 catch(WakeupException e)
184 log.info("--> cleanly interrupted while restoring");
189 log.info("--> restore completed!");
192 // We are intentionally _not_ unsubscribing here, since that would
193 // reset the offset to _earliest_, because we disabled offset-commits.
198 @PostMapping("start")
199 public synchronized String start()
203 log.error("cannot start while restoring");
204 return "Denied: Restoring!";
207 String result = "Started";
212 result = "Restarted";
216 future = CompletableFuture.runAsync(this);
223 public synchronized String stop()
225 if (!(running || restoring))
227 log.info("not running!");
228 return "Not running";
233 if (!future.isDone())
236 log.info("waiting for the consumer...");
241 catch (InterruptedException|ExecutionException e)
243 log.error("Exception while joining polling task!", e);
244 return e.getMessage();
255 public synchronized void shutdown()
257 log.info("shutdown initiated!");
260 log.info("closing consumer");
265 public interface ConsumerUseCases
268 CreateTransferUseCase,
269 HandleStateChangeUseCase {};