1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.errors.WakeupException;
12 import org.springframework.web.bind.annotation.PostMapping;
13 import org.springframework.web.bind.annotation.RequestMapping;
14 import org.springframework.web.bind.annotation.ResponseBody;
16 import java.time.Duration;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
23 @RequestMapping("/consumer")
25 @RequiredArgsConstructor
27 public class TransferConsumer implements Runnable
29 private final String topic;
30 private final KafkaConsumer<String, String> consumer;
31 private final ExecutorService executorService;
32 private final ObjectMapper mapper;
33 private final HandleTransferUseCase handleTransferUseCase;
35 private boolean running = false;
36 private Future<?> future = null;
46 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
47 if (records.count() > 0)
48 log.debug("polled {} records", records.count());
50 records.forEach(record ->
54 Transfer transfer = mapper.readValue(record.value(), Transfer.class);
55 handleTransferUseCase.handle(transfer);
57 catch (JsonProcessingException e)
60 "ignoring invalid json in message #{} on {}/{}: {}",
68 catch (WakeupException e)
70 log.info("polling aborted!");
74 log.info("polling stopped");
79 public synchronized String start()
81 String result = "Started";
89 log.info("subscribing to topic {}", topic);
90 consumer.subscribe(Set.of(topic));
92 future = executorService.submit(this);
98 public synchronized String stop()
102 log.info("not running!");
103 return "Not running";
107 if (!future.isDone())
109 log.info("waiting for the polling-loop to finish...");
114 catch (InterruptedException|ExecutionException e)
116 log.error("Exception while joining polling task!", e);
117 return e.getMessage();
122 log.info("unsubscribing");
123 consumer.unsubscribe();
129 public synchronized void shutdown()
131 log.info("shutdown initiated!");
133 log.info("closing consumer");