X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fadapter%2FTransferConsumer.java;h=17d91dea868b6d9f535984a0234882e27efcbde7;hp=0000000000000000000000000000000000000000;hb=4467c5240397a47b181106a0ae902ed1b71d0c5d;hpb=540f0c5e8ef2c815d7ff37c7af2e119c448cbb1b diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java new file mode 100644 index 0000000..17d91de --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -0,0 +1,135 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +@RequestMapping("/consumer") +@ResponseBody +@RequiredArgsConstructor +@Slf4j +public class TransferConsumer implements Runnable +{ + private final String topic; + private final KafkaConsumer consumer; + private final ExecutorService executorService; + private final ObjectMapper mapper; + private final HandleTransferUseCase handleTransferUseCase; + + private boolean running = false; + private Future future = null; + + + @Override + public void run() + { + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + log.debug("polled {} records", records.count()); + + records.forEach(record -> + { + try + { + Transfer transfer = mapper.readValue(record.value(), Transfer.class); + handleTransferUseCase.handle(transfer); + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + }); + } + catch (WakeupException e) + { + log.info("polling aborted!"); + } + } + + log.info("polling stopped"); + } + + + @PostMapping("start") + public synchronized String start() + { + String result = "Started"; + + if (running) + { + stop(); + result = "Restarted"; + } + + log.info("subscribing to topic {}", topic); + consumer.subscribe(Set.of(topic)); + running = true; + future = executorService.submit(this); + + return result; + } + + @PostMapping("stop") + public synchronized String stop() + { + if (!running) + { + log.info("not running!"); + return "Not running"; + } + + running = false; + if (!future.isDone()) + consumer.wakeup(); + log.info("waiting for the polling-loop to finish..."); + try + { + future.get(); + } + catch (InterruptedException|ExecutionException e) + { + log.error("Exception while joining polling task!", e); + return e.getMessage(); + } + finally + { + future = null; + log.info("unsubscribing"); + consumer.unsubscribe(); + } + + return "Stoped"; + } + + public synchronized void shutdown() + { + log.info("shutdown initiated!"); + stop(); + log.info("closing consumer"); + consumer.close(); + } +}