1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import lombok.RequiredArgsConstructor;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.errors.WakeupException;
16 import org.springframework.context.event.ContextRefreshedEvent;
17 import org.springframework.context.event.EventListener;
18 import org.springframework.web.bind.annotation.PostMapping;
19 import org.springframework.web.bind.annotation.RequestMapping;
20 import org.springframework.web.bind.annotation.ResponseBody;
22 import java.time.Duration;
23 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Future;
28 import java.util.stream.Collectors;
30 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
33 @RequestMapping("/consumer")
35 @RequiredArgsConstructor
37 public class TransferConsumer implements Runnable
39 private final String topic;
40 private final KafkaConsumer<String, String> consumer;
41 private final ExecutorService executorService;
42 private final ObjectMapper mapper;
43 private final ConsumerUseCases productionUseCases, restoreUseCases;
45 private boolean restoring = true;
46 private boolean running = false;
47 private boolean shutdown = false;
48 private Future<?> future = null;
58 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
59 if (records.count() == 0)
62 log.debug("polled {} records", records.count());
63 records.forEach(record -> handleRecord(record, productionUseCases));
65 catch (WakeupException e)
67 log.info("cleanly interrupted while polling");
71 log.info("polling stopped");
74 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
78 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
82 case EventType.NEW_TRANSFER:
84 NewTransferEvent newTransferEvent =
85 mapper.readValue(record.value(), NewTransferEvent.class);
86 useCases.create(newTransferEvent.toTransfer().setState(CREATED));
89 case EventType.TRANSFER_STATE_CHANGED:
91 TransferStateChangedEvent stateChangedEvent =
92 mapper.readValue(record.value(), TransferStateChangedEvent.class);
94 .get(stateChangedEvent.getId())
96 transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
97 () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
101 catch (JsonProcessingException e)
104 "ignoring invalid json in message #{} on {}/{}: {}",
113 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
115 // Needed, because this method is called synchronously during the
116 // initialization pahse of Spring. If the restoring is processed
117 // in the same thread, it would block the completion of the initialization.
118 // Hence, the app would not react to any signal (CTRL-C, for example) except
119 // a KILL until the restoring is finished.
120 future = executorService.submit(() -> restore());
123 private void restore()
125 log.info("--> starting restore...");
127 List<TopicPartition> partitions =
129 .partitionsFor(topic)
131 .map(info -> new TopicPartition(topic, info.partition()))
132 .collect(Collectors.toList());
134 Map<Integer, Long> lastSeen =
136 .endOffsets(partitions)
139 .collect(Collectors.toMap(
140 entry -> entry.getKey().partition(),
141 entry -> entry.getValue() - 1));
143 Map<Integer, Long> positions =
147 .collect(Collectors.toMap(
148 partition -> partition,
151 log.info("assigning {}}", partitions);
152 consumer.assign(partitions);
159 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
160 .reduce(false, (a, b) -> a || b))
164 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
165 if (records.count() == 0)
168 log.debug("polled {} records", records.count());
169 records.forEach(record ->
171 handleRecord(record, restoreUseCases);
172 positions.put(record.partition(), record.offset());
175 catch(WakeupException e)
177 log.info("--> cleanly interrupted while restoring");
182 log.info("--> restore completed!");
185 // We are intentionally _not_ unsubscribing here, since that would
186 // reset the offset to _earliest_, because we disabled offset-commits.
191 @PostMapping("start")
192 public synchronized String start()
196 log.error("cannot start while restoring");
197 return "Denied: Restoring!";
200 String result = "Started";
205 result = "Restarted";
209 future = executorService.submit(this);
216 public synchronized String stop()
218 if (!(running || restoring))
220 log.info("not running!");
221 return "Not running";
226 if (!future.isDone())
229 log.info("waiting for the consumer...");
234 catch (InterruptedException|ExecutionException e)
236 log.error("Exception while joining polling task!", e);
237 return e.getMessage();
248 public synchronized void shutdown()
250 log.info("shutdown initiated!");
253 log.info("closing consumer");
258 public interface ConsumerUseCases
261 CreateTransferUseCase,
262 HandleStateChangeUseCase {};