TransferConsumer logs the number of polled records only, when positive
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.consumer.KafkaConsumer;
11 import org.apache.kafka.common.errors.WakeupException;
12 import org.springframework.web.bind.annotation.PostMapping;
13 import org.springframework.web.bind.annotation.RequestMapping;
14 import org.springframework.web.bind.annotation.ResponseBody;
15
16 import java.time.Duration;
17 import java.util.Set;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Future;
21
22
23 @RequestMapping("/consumer")
24 @ResponseBody
25 @RequiredArgsConstructor
26 @Slf4j
27 public class TransferConsumer implements Runnable
28 {
29   private final String topic;
30   private final KafkaConsumer<String, String> consumer;
31   private final ExecutorService executorService;
32   private final ObjectMapper mapper;
33   private final HandleTransferUseCase handleTransferUseCase;
34
35   private boolean running = false;
36   private Future<?> future = null;
37
38
39   @Override
40   public void run()
41   {
42     while (running)
43     {
44       try
45       {
46         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
47         if (records.count() > 0)
48           log.debug("polled {} records", records.count());
49
50         records.forEach(record ->
51         {
52           try
53           {
54             Transfer transfer = mapper.readValue(record.value(), Transfer.class);
55             handleTransferUseCase.handle(transfer);
56           }
57           catch (JsonProcessingException e)
58           {
59             log.error(
60                 "ignoring invalid json in message #{} on {}/{}: {}",
61                 record.offset(),
62                 record.topic(),
63                 record.partition(),
64                 record.value());
65           }
66         });
67       }
68       catch (WakeupException e)
69       {
70         log.info("polling aborted!");
71       }
72     }
73
74     log.info("polling stopped");
75   }
76
77
78   @PostMapping("start")
79   public synchronized String start()
80   {
81     String result = "Started";
82
83     if (running)
84     {
85       stop();
86       result = "Restarted";
87     }
88
89     log.info("subscribing to topic {}", topic);
90     consumer.subscribe(Set.of(topic));
91     running = true;
92     future = executorService.submit(this);
93
94     return result;
95   }
96
97   @PostMapping("stop")
98   public synchronized String stop()
99   {
100     if (!running)
101     {
102       log.info("not running!");
103       return "Not running";
104     }
105
106     running = false;
107     if (!future.isDone())
108       consumer.wakeup();
109     log.info("waiting for the polling-loop to finish...");
110     try
111     {
112       future.get();
113     }
114     catch (InterruptedException|ExecutionException e)
115     {
116       log.error("Exception while joining polling task!", e);
117       return e.getMessage();
118     }
119     finally
120     {
121       future = null;
122       log.info("unsubscribing");
123       consumer.unsubscribe();
124     }
125
126     return "Stoped";
127   }
128
129   public synchronized void shutdown()
130   {
131     log.info("shutdown initiated!");
132     stop();
133     log.info("closing consumer");
134     consumer.close();
135   }
136 }