1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
21 import java.time.Duration;
22 import java.util.List;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
30 @RequestMapping("/consumer")
32 @RequiredArgsConstructor
34 public class TransferConsumer implements Runnable
36 private final String topic;
37 private final KafkaConsumer<String, String> consumer;
38 private final ExecutorService executorService;
39 private final ObjectMapper mapper;
40 private final ConsumerUseCases productionUseCases, restoreUseCases;
42 private boolean restoring = true;
43 private boolean running = false;
44 private boolean shutdown = false;
45 private Future<?> future = null;
55 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
56 if (records.count() == 0)
59 log.debug("polled {} records", records.count());
60 records.forEach(record -> handleRecord(record, productionUseCases));
62 catch (WakeupException e)
64 log.info("cleanly interrupted while polling");
68 log.info("polling stopped");
71 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
75 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
79 case EventType.NEW_TRANSFER:
81 NewTransferEvent newTransferEvent =
82 mapper.readValue(record.value(), NewTransferEvent.class);
85 newTransferEvent.getId(),
86 newTransferEvent.getPayer(),
87 newTransferEvent.getPayee(),
88 newTransferEvent.getAmount());
91 case EventType.TRANSFER_STATE_CHANGED:
93 TransferStateChangedEvent stateChangedEvent =
94 mapper.readValue(record.value(), TransferStateChangedEvent.class);
95 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
99 catch (JsonProcessingException e)
102 "ignoring invalid json in message #{} on {}/{}: {}",
111 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
113 // Needed, because this method is called synchronously during the
114 // initialization pahse of Spring. If the restoring is processed
115 // in the same thread, it would block the completion of the initialization.
116 // Hence, the app would not react to any signal (CTRL-C, for example) except
117 // a KILL until the restoring is finished.
118 future = executorService.submit(() -> restore());
121 private void restore()
123 log.info("--> starting restore...");
125 List<TopicPartition> partitions =
127 .partitionsFor(topic)
129 .map(info -> new TopicPartition(topic, info.partition()))
130 .collect(Collectors.toList());
132 Map<Integer, Long> lastSeen =
134 .endOffsets(partitions)
137 .collect(Collectors.toMap(
138 entry -> entry.getKey().partition(),
139 entry -> entry.getValue() - 1));
141 Map<Integer, Long> positions =
145 .collect(Collectors.toMap(
146 partition -> partition,
149 log.info("assigning {}}", partitions);
150 consumer.assign(partitions);
157 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
158 .reduce(false, (a, b) -> a || b))
162 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
163 if (records.count() == 0)
166 log.debug("polled {} records", records.count());
167 records.forEach(record ->
169 handleRecord(record, restoreUseCases);
170 positions.put(record.partition(), record.offset());
173 catch(WakeupException e)
175 log.info("--> cleanly interrupted while restoring");
180 log.info("--> restore completed!");
183 // We are intentionally _not_ unsubscribing here, since that would
184 // reset the offset to _earliest_, because we disabled offset-commits.
189 @PostMapping("start")
190 public synchronized String start()
194 log.error("cannot start while restoring");
195 return "Denied: Restoring!";
198 String result = "Started";
203 result = "Restarted";
207 future = executorService.submit(this);
214 public synchronized String stop()
216 if (!(running || restoring))
218 log.info("not running!");
219 return "Not running";
224 if (!future.isDone())
227 log.info("waiting for the consumer...");
232 catch (InterruptedException|ExecutionException e)
234 log.error("Exception while joining polling task!", e);
235 return e.getMessage();
246 public synchronized void shutdown()
248 log.info("shutdown initiated!");
251 log.info("closing consumer");
256 public interface ConsumerUseCases
259 CreateTransferUseCase,
260 HandleStateChangeUseCase {};