Refined the feedback-messages of the TransferConsumer
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.admin.AdminClient;
11 import org.apache.kafka.clients.admin.MemberDescription;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.errors.WakeupException;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
23
24 import java.time.Duration;
25 import java.util.*;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Future;
29 import java.util.stream.Collectors;
30
31
32 @RequestMapping("/consumer")
33 @ResponseBody
34 @Slf4j
35 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
36 {
37   private final String topic;
38   private final int numPartitions;
39   private final KafkaConsumer<String, String> consumer;
40   private final AdminClient adminClient;
41   private final TransferRepository repository;
42   private final ObjectMapper mapper;
43   private final ConsumerUseCases productionUseCases, restoreUseCases;
44
45   private boolean running = false;
46   private boolean shutdown = false;
47   private Future<?> future = null;
48
49   private final String groupId;
50   private final String groupInstanceId;
51   private final Map<String, String> instanceIdUriMapping;
52   private final String[] instanceIdByPartition;
53
54   private volatile boolean partitionOwnershipUnknown = true;
55
56
57   public TransferConsumer(
58       String topic,
59       int numPartitions,
60       Map<String, String> instanceIdUriMapping,
61       KafkaConsumer<String, String> consumer,
62       AdminClient adminClient,
63       TransferRepository repository,
64       ObjectMapper mapper,
65       ConsumerUseCases productionUseCases,
66       ConsumerUseCases restoreUseCases)
67   {
68     this.topic = topic;
69     this.numPartitions = numPartitions;
70     this.groupId = consumer.groupMetadata().groupId();
71     this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
72     this.instanceIdByPartition = new String[numPartitions];
73     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
74     for (String instanceId : instanceIdUriMapping.keySet())
75     {
76       // Requests are not redirected for the instance itself
77       String uri = instanceId.equals(groupInstanceId)
78           ? null
79           : instanceIdUriMapping.get(instanceId);
80       this.instanceIdUriMapping.put(instanceId, uri);
81     }
82     this.consumer = consumer;
83     this.adminClient = adminClient;
84     this.repository = repository;
85     this.mapper = mapper;
86     this.productionUseCases = productionUseCases;
87     this.restoreUseCases = restoreUseCases;
88   }
89
90
91   @Override
92   public void run()
93   {
94     while (running)
95     {
96       try
97       {
98         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
99         if (records.count() == 0)
100           continue;
101
102         log.debug("polled {} records", records.count());
103         records.forEach(record -> handleRecord(record, productionUseCases));
104       }
105       catch (WakeupException e)
106       {
107         log.info("cleanly interrupted while polling");
108       }
109     }
110
111     log.info("polling stopped");
112   }
113
114   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
115   {
116     try
117     {
118       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
119
120       switch (eventType)
121       {
122         case EventType.NEW_TRANSFER:
123
124           NewTransferEvent newTransferEvent =
125               mapper.readValue(record.value(), NewTransferEvent.class);
126           useCases
127               .create(
128                   newTransferEvent.getId(),
129                   newTransferEvent.getPayer(),
130                   newTransferEvent.getPayee(),
131                   newTransferEvent.getAmount());
132           break;
133
134         case EventType.TRANSFER_STATE_CHANGED:
135
136           TransferStateChangedEvent stateChangedEvent =
137               mapper.readValue(record.value(), TransferStateChangedEvent.class);
138           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
139           break;
140       }
141     }
142     catch (JsonProcessingException e)
143     {
144       log.error(
145           "ignoring invalid json in message #{} on {}/{}: {}",
146           record.offset(),
147           record.topic(),
148           record.partition(),
149           record.value());
150     }
151     catch (IllegalArgumentException e)
152     {
153       log.error(
154           "ignoring invalid message #{} on {}/{}: {}, message={}",
155           record.offset(),
156           record.topic(),
157           record.partition(),
158           e.getMessage(),
159           record.value());
160     }
161   }
162
163
164   public Optional<String> uriForKey(String key)
165   {
166     synchronized (this)
167     {
168       while (partitionOwnershipUnknown)
169       {
170         try { wait(); } catch (InterruptedException e) {}
171       }
172
173       int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
174       return
175           Optional
176               .ofNullable(instanceIdByPartition[partition])
177               .map(id -> instanceIdUriMapping.get(id));
178     }
179   }
180
181   @EventListener
182   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
183   {
184     // "Needed", because this method is called synchronously during the
185     // initialization pahse of Spring. If the subscription happens
186     // in the same thread, it would block the completion of the initialization.
187     // Hence, the app would not react to any signal (CTRL-C, for example) except
188     // a KILL until the restoring is finished.
189     future = CompletableFuture.runAsync(() -> start());
190     log.info("start of application completed");
191   }
192
193
194   @Override
195   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
196   {
197     partitionOwnershipUnknown = true;
198     log.info("partitions revoked: {}", partitions);
199   }
200
201   @Override
202   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
203   {
204     log.info("partitions assigned: {}", partitions);
205     fetchAssignmentsAsync();
206     if (partitions.size() > 0)
207       restore(partitions);
208   }
209
210   private void fetchAssignmentsAsync()
211   {
212     adminClient
213         .describeConsumerGroups(List.of(groupId))
214         .describedGroups()
215         .get(groupId)
216         .whenComplete((descriptions, e) ->
217         {
218           if (e != null)
219           {
220             log.error("could not fetch group data: {}", e.getMessage());
221           }
222           else
223           {
224             synchronized (this)
225             {
226               for (MemberDescription description : descriptions.members())
227               {
228                 description
229                     .assignment()
230                     .topicPartitions()
231                     .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
232               }
233               partitionOwnershipUnknown = false;
234               notifyAll();
235             }
236           }
237         });
238   }
239
240   @Override
241   public void onPartitionsLost(Collection<TopicPartition> partitions)
242   {
243     partitionOwnershipUnknown = true;
244     log.info("partiotions lost: {}", partitions);
245   }
246
247
248   private void restore(Collection<TopicPartition> partitions)
249   {
250     log.info("--> starting restore...");
251
252     partitions
253         .stream()
254         .map(topicPartition -> topicPartition.partition())
255         .forEach(partition -> repository.resetStorageForPartition(partition));
256
257     Map<Integer, Long> lastSeen =
258         consumer
259             .endOffsets(partitions)
260             .entrySet()
261             .stream()
262             .collect(Collectors.toMap(
263                 entry -> entry.getKey().partition(),
264                 entry -> entry.getValue() - 1));
265
266     Map<Integer, Long> positions =
267         lastSeen
268             .keySet()
269             .stream()
270             .collect(Collectors.toMap(
271                 partition -> partition,
272                 partition -> 0l));
273
274     while (
275         positions
276             .entrySet()
277             .stream()
278             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
279             .reduce(false, (a, b) -> a || b))
280     {
281       try
282       {
283         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
284         if (records.count() == 0)
285           continue;
286
287         log.debug("polled {} records", records.count());
288         records.forEach(record ->
289         {
290           handleRecord(record, restoreUseCases);
291           positions.put(record.partition(), record.offset());
292         });
293       }
294       catch(WakeupException e)
295       {
296         log.info("--> cleanly interrupted while restoring");
297       }
298     }
299
300     log.info("--> restore completed!");
301   }
302
303   @PostMapping("start")
304   public synchronized String start()
305   {
306     if (running)
307     {
308       log.info("consumer already running!");
309       return "Already running!";
310     }
311
312     int foundNumPartitions = consumer.partitionsFor(topic).size();
313     if (foundNumPartitions != numPartitions)
314     {
315       log.error(
316           "unexpected number of partitions for topic {}: expected={}, found={}",
317           topic,
318           numPartitions,
319           foundNumPartitions
320           );
321       return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
322     }
323
324     consumer.subscribe(List.of(topic), this);
325
326     running = true;
327     future = CompletableFuture.runAsync(this);
328
329     log.info("consumer started");
330     return "Started";
331   }
332
333   @PostMapping("stop")
334   public synchronized String stop()
335   {
336     if (!running)
337     {
338       log.info("consumer not running!");
339       return "Not running";
340     }
341
342     running = false;
343
344     if (!future.isDone())
345       consumer.wakeup();
346
347     log.info("waiting for the consumer...");
348     try
349     {
350       future.get();
351     }
352     catch (InterruptedException|ExecutionException e)
353     {
354       log.error("Exception while joining polling task!", e);
355       return e.getMessage();
356     }
357     finally
358     {
359       future = null;
360       consumer.unsubscribe();
361     }
362
363     log.info("consumer stopped");
364     return "Stopped";
365   }
366
367   public synchronized void shutdown()
368   {
369     log.info("shutdown initiated!");
370     shutdown = true;
371     stop();
372     log.info("closing consumer");
373     consumer.close();
374   }
375
376
377
378   public interface ConsumerUseCases
379       extends
380         GetTransferUseCase,
381         CreateTransferUseCase,
382         HandleStateChangeUseCase {};
383 }