Introduced different Events for the creation and the state-changes
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import lombok.RequiredArgsConstructor;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.common.errors.WakeupException;
14 import org.springframework.web.bind.annotation.PostMapping;
15 import org.springframework.web.bind.annotation.RequestMapping;
16 import org.springframework.web.bind.annotation.ResponseBody;
17
18 import java.time.Duration;
19 import java.util.Set;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Future;
23
24 import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
25
26
27 @RequestMapping("/consumer")
28 @ResponseBody
29 @RequiredArgsConstructor
30 @Slf4j
31 public class TransferConsumer implements Runnable
32 {
33   private final String topic;
34   private final KafkaConsumer<String, String> consumer;
35   private final ExecutorService executorService;
36   private final ObjectMapper mapper;
37   private final GetTransferUseCase getTransferUseCase;
38   private final CreateTransferUseCase createTransferUseCase;
39   private final HandleStateChangeUseCase handleStateChangeUseCase;
40
41   private boolean running = false;
42   private Future<?> future = null;
43
44
45   @Override
46   public void run()
47   {
48     while (running)
49     {
50       try
51       {
52         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
53         if (records.count() > 0)
54           log.debug("polled {} records", records.count());
55
56         records.forEach(record ->
57         {
58           try
59           {
60             byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
61
62             switch (eventType)
63             {
64               case EventType.NEW_TRANSFER:
65
66                 NewTransferEvent newTransferEvent =
67                     mapper.readValue(record.value(), NewTransferEvent.class);
68                 createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
69                 break;
70
71               case EventType.TRANSFER_STATE_CHANGED:
72
73                 TransferStateChangedEvent stateChangedEvent =
74                     mapper.readValue(record.value(), TransferStateChangedEvent.class);
75                 getTransferUseCase
76                     .get(stateChangedEvent.getId())
77                     .ifPresentOrElse(
78                         transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
79                         () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
80                 break;
81             }
82           }
83           catch (JsonProcessingException e)
84           {
85             log.error(
86                 "ignoring invalid json in message #{} on {}/{}: {}",
87                 record.offset(),
88                 record.topic(),
89                 record.partition(),
90                 record.value());
91           }
92         });
93       }
94       catch (WakeupException e)
95       {
96         log.info("polling aborted!");
97       }
98     }
99
100     log.info("polling stopped");
101   }
102
103
104   @PostMapping("start")
105   public synchronized String start()
106   {
107     String result = "Started";
108
109     if (running)
110     {
111       stop();
112       result = "Restarted";
113     }
114
115     log.info("subscribing to topic {}", topic);
116     consumer.subscribe(Set.of(topic));
117     running = true;
118     future = executorService.submit(this);
119
120     return result;
121   }
122
123   @PostMapping("stop")
124   public synchronized String stop()
125   {
126     if (!running)
127     {
128       log.info("not running!");
129       return "Not running";
130     }
131
132     running = false;
133     if (!future.isDone())
134       consumer.wakeup();
135     log.info("waiting for the polling-loop to finish...");
136     try
137     {
138       future.get();
139     }
140     catch (InterruptedException|ExecutionException e)
141     {
142       log.error("Exception while joining polling task!", e);
143       return e.getMessage();
144     }
145     finally
146     {
147       future = null;
148       log.info("unsubscribing");
149       consumer.unsubscribe();
150     }
151
152     return "Stoped";
153   }
154
155   public synchronized void shutdown()
156   {
157     log.info("shutdown initiated!");
158     stop();
159     log.info("closing consumer");
160     consumer.close();
161   }
162 }