17d91dea868b6d9f535984a0234882e27efcbde7
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.errors.WakeupException;
12 import org.springframework.web.bind.annotation.PostMapping;
13 import org.springframework.web.bind.annotation.RequestMapping;
14 import org.springframework.web.bind.annotation.ResponseBody;
15
16 import java.time.Duration;
17 import java.util.Set;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
21
22
23 @RequestMapping("/consumer")
24 @ResponseBody
25 @RequiredArgsConstructor
26 @Slf4j
27 public class TransferConsumer implements Runnable
28 {
29   private final String topic;
30   private final KafkaConsumer<String, String> consumer;
31   private final ExecutorService executorService;
32   private final ObjectMapper mapper;
33   private final HandleTransferUseCase handleTransferUseCase;
34
35   private boolean running = false;
36   private Future<?> future = null;
37
38
39   @Override
40   public void run()
41   {
42     while (running)
43     {
44       try
45       {
46         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
47         log.debug("polled {} records", records.count());
48
49         records.forEach(record ->
50         {
51           try
52           {
53             Transfer transfer = mapper.readValue(record.value(), Transfer.class);
54             handleTransferUseCase.handle(transfer);
55           }
56           catch (JsonProcessingException e)
57           {
58             log.error(
59                 "ignoring invalid json in message #{} on {}/{}: {}",
60                 record.offset(),
61                 record.topic(),
62                 record.partition(),
63                 record.value());
64           }
65         });
66       }
67       catch (WakeupException e)
68       {
69         log.info("polling aborted!");
70       }
71     }
72
73     log.info("polling stopped");
74   }
75
76
77   @PostMapping("start")
78   public synchronized String start()
79   {
80     String result = "Started";
81
82     if (running)
83     {
84       stop();
85       result = "Restarted";
86     }
87
88     log.info("subscribing to topic {}", topic);
89     consumer.subscribe(Set.of(topic));
90     running = true;
91     future = executorService.submit(this);
92
93     return result;
94   }
95
96   @PostMapping("stop")
97   public synchronized String stop()
98   {
99     if (!running)
100     {
101       log.info("not running!");
102       return "Not running";
103     }
104
105     running = false;
106     if (!future.isDone())
107       consumer.wakeup();
108     log.info("waiting for the polling-loop to finish...");
109     try
110     {
111       future.get();
112     }
113     catch (InterruptedException|ExecutionException e)
114     {
115       log.error("Exception while joining polling task!", e);
116       return e.getMessage();
117     }
118     finally
119     {
120       future = null;
121       log.info("unsubscribing");
122       consumer.unsubscribe();
123     }
124
125     return "Stoped";
126   }
127
128   public synchronized void shutdown()
129   {
130     log.info("shutdown initiated!");
131     stop();
132     log.info("closing consumer");
133     consumer.close();
134   }
135 }