1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import lombok.RequiredArgsConstructor;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.errors.WakeupException;
14 import org.springframework.web.bind.annotation.PostMapping;
15 import org.springframework.web.bind.annotation.RequestMapping;
16 import org.springframework.web.bind.annotation.ResponseBody;
18 import java.time.Duration;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
24 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
27 @RequestMapping("/consumer")
29 @RequiredArgsConstructor
31 public class TransferConsumer implements Runnable
33 private final String topic;
34 private final KafkaConsumer<String, String> consumer;
35 private final ExecutorService executorService;
36 private final ObjectMapper mapper;
37 private final GetTransferUseCase getTransferUseCase;
38 private final CreateTransferUseCase createTransferUseCase;
39 private final HandleStateChangeUseCase handleStateChangeUseCase;
41 private boolean running = false;
42 private Future<?> future = null;
52 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
53 if (records.count() > 0)
54 log.debug("polled {} records", records.count());
56 records.forEach(record ->
60 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
64 case EventType.NEW_TRANSFER:
66 NewTransferEvent newTransferEvent =
67 mapper.readValue(record.value(), NewTransferEvent.class);
68 createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
71 case EventType.TRANSFER_STATE_CHANGED:
73 TransferStateChangedEvent stateChangedEvent =
74 mapper.readValue(record.value(), TransferStateChangedEvent.class);
76 .get(stateChangedEvent.getId())
78 transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
79 () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
83 catch (JsonProcessingException e)
86 "ignoring invalid json in message #{} on {}/{}: {}",
94 catch (WakeupException e)
96 log.info("polling aborted!");
100 log.info("polling stopped");
104 @PostMapping("start")
105 public synchronized String start()
107 String result = "Started";
112 result = "Restarted";
115 log.info("subscribing to topic {}", topic);
116 consumer.subscribe(Set.of(topic));
118 future = executorService.submit(this);
124 public synchronized String stop()
128 log.info("not running!");
129 return "Not running";
133 if (!future.isDone())
135 log.info("waiting for the polling-loop to finish...");
140 catch (InterruptedException|ExecutionException e)
142 log.error("Exception while joining polling task!", e);
143 return e.getMessage();
148 log.info("unsubscribing");
149 consumer.unsubscribe();
155 public synchronized void shutdown()
157 log.info("shutdown initiated!");
159 log.info("closing consumer");