1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.TopicPartition;
14 import org.apache.kafka.common.errors.WakeupException;
15 import org.springframework.context.event.ContextRefreshedEvent;
16 import org.springframework.context.event.EventListener;
17 import org.springframework.web.bind.annotation.PostMapping;
18 import org.springframework.web.bind.annotation.RequestMapping;
19 import org.springframework.web.bind.annotation.ResponseBody;
21 import java.time.Duration;
22 import java.util.List;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Future;
27 import java.util.stream.Collectors;
30 @RequestMapping("/consumer")
32 @RequiredArgsConstructor
34 public class TransferConsumer implements Runnable
36 private final String topic;
37 private final KafkaConsumer<String, String> consumer;
38 private final ExecutorService executorService;
39 private final ObjectMapper mapper;
40 private final ConsumerUseCases productionUseCases, restoreUseCases;
42 private boolean restoring = true;
43 private boolean running = false;
44 private boolean shutdown = false;
45 private Future<?> future = null;
55 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
56 if (records.count() == 0)
59 log.debug("polled {} records", records.count());
60 records.forEach(record -> handleRecord(record, productionUseCases));
62 catch (WakeupException e)
64 log.info("cleanly interrupted while polling");
68 log.info("polling stopped");
71 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
75 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
79 case EventType.NEW_TRANSFER:
81 NewTransferEvent newTransferEvent =
82 mapper.readValue(record.value(), NewTransferEvent.class);
85 newTransferEvent.getId(),
86 newTransferEvent.getPayer(),
87 newTransferEvent.getPayee(),
88 newTransferEvent.getAmount());
91 case EventType.TRANSFER_STATE_CHANGED:
93 TransferStateChangedEvent stateChangedEvent =
94 mapper.readValue(record.value(), TransferStateChangedEvent.class);
95 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
99 catch (JsonProcessingException e)
102 "ignoring invalid json in message #{} on {}/{}: {}",
108 catch (IllegalArgumentException e)
111 "ignoring invalid message #{} on {}/{}: {}, message={}",
121 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
123 // Needed, because this method is called synchronously during the
124 // initialization pahse of Spring. If the restoring is processed
125 // in the same thread, it would block the completion of the initialization.
126 // Hence, the app would not react to any signal (CTRL-C, for example) except
127 // a KILL until the restoring is finished.
128 future = executorService.submit(() -> restore());
131 private void restore()
133 log.info("--> starting restore...");
135 List<TopicPartition> partitions =
137 .partitionsFor(topic)
139 .map(info -> new TopicPartition(topic, info.partition()))
140 .collect(Collectors.toList());
142 Map<Integer, Long> lastSeen =
144 .endOffsets(partitions)
147 .collect(Collectors.toMap(
148 entry -> entry.getKey().partition(),
149 entry -> entry.getValue() - 1));
151 Map<Integer, Long> positions =
155 .collect(Collectors.toMap(
156 partition -> partition,
159 log.info("assigning {}}", partitions);
160 consumer.assign(partitions);
167 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
168 .reduce(false, (a, b) -> a || b))
172 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
173 if (records.count() == 0)
176 log.debug("polled {} records", records.count());
177 records.forEach(record ->
179 handleRecord(record, restoreUseCases);
180 positions.put(record.partition(), record.offset());
183 catch(WakeupException e)
185 log.info("--> cleanly interrupted while restoring");
190 log.info("--> restore completed!");
193 // We are intentionally _not_ unsubscribing here, since that would
194 // reset the offset to _earliest_, because we disabled offset-commits.
199 @PostMapping("start")
200 public synchronized String start()
204 log.error("cannot start while restoring");
205 return "Denied: Restoring!";
208 String result = "Started";
213 result = "Restarted";
217 future = executorService.submit(this);
224 public synchronized String stop()
226 if (!(running || restoring))
228 log.info("not running!");
229 return "Not running";
234 if (!future.isDone())
237 log.info("waiting for the consumer...");
242 catch (InterruptedException|ExecutionException e)
244 log.error("Exception while joining polling task!", e);
245 return e.getMessage();
256 public synchronized void shutdown()
258 log.info("shutdown initiated!");
261 log.info("closing consumer");
266 public interface ConsumerUseCases
269 CreateTransferUseCase,
270 HandleStateChangeUseCase {};