1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.errors.WakeupException;
12 import org.springframework.web.bind.annotation.PostMapping;
13 import org.springframework.web.bind.annotation.RequestMapping;
14 import org.springframework.web.bind.annotation.ResponseBody;
16 import java.time.Duration;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
23 @RequestMapping("/consumer")
25 @RequiredArgsConstructor
27 public class TransferConsumer implements Runnable
29 private final String topic;
30 private final KafkaConsumer<String, String> consumer;
31 private final ExecutorService executorService;
32 private final ObjectMapper mapper;
33 private final HandleTransferUseCase handleTransferUseCase;
35 private boolean running = false;
36 private Future<?> future = null;
46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
47 log.debug("polled {} records", records.count());
49 records.forEach(record ->
53 Transfer transfer = mapper.readValue(record.value(), Transfer.class);
54 handleTransferUseCase.handle(transfer);
56 catch (JsonProcessingException e)
59 "ignoring invalid json in message #{} on {}/{}: {}",
67 catch (WakeupException e)
69 log.info("polling aborted!");
73 log.info("polling stopped");
78 public synchronized String start()
80 String result = "Started";
88 log.info("subscribing to topic {}", topic);
89 consumer.subscribe(Set.of(topic));
91 future = executorService.submit(this);
97 public synchronized String stop()
101 log.info("not running!");
102 return "Not running";
106 if (!future.isDone())
108 log.info("waiting for the polling-loop to finish...");
113 catch (InterruptedException|ExecutionException e)
115 log.error("Exception while joining polling task!", e);
116 return e.getMessage();
121 log.info("unsubscribing");
122 consumer.unsubscribe();
128 public synchronized void shutdown()
130 log.info("shutdown initiated!");
132 log.info("closing consumer");