Bugfix: Check for existence of a new transfer requires a remote-call
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.MemberDescription;
13 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.clients.consumer.KafkaConsumer;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.errors.WakeupException;
19 import org.springframework.context.event.ContextRefreshedEvent;
20 import org.springframework.context.event.EventListener;
21 import org.springframework.web.bind.annotation.PostMapping;
22 import org.springframework.web.bind.annotation.RequestMapping;
23 import org.springframework.web.bind.annotation.ResponseBody;
24
25 import java.time.Clock;
26 import java.time.Duration;
27 import java.time.Instant;
28 import java.util.*;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import java.util.stream.Collectors;
33
34
35 @RequestMapping("/consumer")
36 @ResponseBody
37 @Slf4j
38 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
39 {
40   private final String topic;
41   private final int numPartitions;
42   private final KafkaConsumer<String, String> consumer;
43   private final AdminClient adminClient;
44   private final TransferRepository repository;
45   private final ObjectMapper mapper;
46   private final ConsumerUseCases productionUseCases, restoreUseCases;
47
48   private boolean running = false;
49   private boolean shutdown = false;
50   private Future<?> future = null;
51
52   private final String groupId;
53   private final String groupInstanceId;
54   private final Map<String, String> instanceIdUriMapping;
55   private final String[] instanceIdByPartition;
56
57   private Clock clock;
58   private int stateStoreInterval;
59
60   private volatile boolean partitionOwnershipUnknown = true;
61
62
63   public TransferConsumer(
64       String topic,
65       int numPartitions,
66       Map<String, String> instanceIdUriMapping,
67       KafkaConsumer<String, String> consumer,
68       AdminClient adminClient,
69       TransferRepository repository,
70       Clock clock,
71       int stateStoreInterval,
72       ObjectMapper mapper,
73       ConsumerUseCases productionUseCases,
74       ConsumerUseCases restoreUseCases)
75   {
76     this.topic = topic;
77     this.numPartitions = numPartitions;
78     this.groupId = consumer.groupMetadata().groupId();
79     this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
80     this.instanceIdByPartition = new String[numPartitions];
81     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
82     for (String instanceId : instanceIdUriMapping.keySet())
83     {
84       // Requests are not redirected for the instance itself
85       String uri = instanceId.equals(groupInstanceId)
86           ? null
87           : instanceIdUriMapping.get(instanceId);
88       this.instanceIdUriMapping.put(instanceId, uri);
89     }
90     this.consumer = consumer;
91     this.adminClient = adminClient;
92     this.repository = repository;
93     this.clock = clock;
94     this.stateStoreInterval = stateStoreInterval;
95     this.mapper = mapper;
96     this.productionUseCases = productionUseCases;
97     this.restoreUseCases = restoreUseCases;
98   }
99
100
101   @Override
102   public void run()
103   {
104     Instant stateStored = clock.instant();
105
106     while (running)
107     {
108       try
109       {
110         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
111         if (records.count() == 0)
112           continue;
113
114         log.debug("polled {} records", records.count());
115         records.forEach(record -> handleRecord(record, productionUseCases));
116
117         Instant now = clock.instant();
118         if (
119             stateStoreInterval > 0 &&
120             Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
121         {
122           Map<Integer, Long> offsets = new HashMap<>();
123
124           for (TopicPartition topicPartition : consumer.assignment())
125           {
126             Integer partition = topicPartition.partition();
127             Long offset = consumer.position(topicPartition);
128             log.info("storing state locally for {}/{}: {}", topic, partition, offset);
129             offsets.put(partition, offset);
130           }
131
132           repository.storeState(offsets);
133           stateStored = now;
134         }
135       }
136       catch (WakeupException e)
137       {
138         log.info("cleanly interrupted while polling");
139       }
140     }
141
142     log.info("polling stopped");
143   }
144
145   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
146   {
147     try
148     {
149       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
150
151       switch (eventType)
152       {
153         case EventType.NEW_TRANSFER:
154
155           NewTransferEvent newTransferEvent =
156               mapper.readValue(record.value(), NewTransferEvent.class);
157           useCases
158               .create(
159                   newTransferEvent.getId(),
160                   newTransferEvent.getPayer(),
161                   newTransferEvent.getPayee(),
162                   newTransferEvent.getAmount());
163           break;
164
165         case EventType.TRANSFER_STATE_CHANGED:
166
167           TransferStateChangedEvent stateChangedEvent =
168               mapper.readValue(record.value(), TransferStateChangedEvent.class);
169           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
170           break;
171       }
172     }
173     catch (JsonProcessingException e)
174     {
175       log.error(
176           "ignoring invalid json in message #{} on {}/{}: {}",
177           record.offset(),
178           record.topic(),
179           record.partition(),
180           record.value());
181     }
182     catch (IllegalArgumentException e)
183     {
184       log.error(
185           "ignoring invalid message #{} on {}/{}: {}, message={}",
186           record.offset(),
187           record.topic(),
188           record.partition(),
189           e.getMessage(),
190           record.value());
191     }
192   }
193
194
195   /**
196    * Identifies the URI, at which the Group-Instance can be reached,
197    * that holds the state for a specific {@link Transfer}.
198    *
199    * The {@link Transfer#getId() ID} of the {@link Transfer} is named
200    * {@code key} here and of type {@code String}, because this example
201    * project stores the key as a String in Kafka to simplify the listing
202    * and manual manipulation of the according topic.
203    *
204    * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
205    * @return An {@link Optional}, that holds the URI at which the Group-Instance
206    * can be reached, that holds the state for the {@link Transfer}, that
207    * is identified by the key (if present), or is empty, if the {@link Transfer}
208    * would be handled by the local instance.
209    */
210   public Optional<String> uriForKey(String key)
211   {
212     synchronized (this)
213     {
214       while (partitionOwnershipUnknown)
215       {
216         try { wait(); } catch (InterruptedException e) {}
217       }
218
219       int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
220       return
221           Optional
222               .ofNullable(instanceIdByPartition[partition])
223               .map(id -> instanceIdUriMapping.get(id));
224     }
225   }
226
227   @EventListener
228   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
229   {
230     // "Needed", because this method is called synchronously during the
231     // initialization pahse of Spring. If the subscription happens
232     // in the same thread, it would block the completion of the initialization.
233     // Hence, the app would not react to any signal (CTRL-C, for example) except
234     // a KILL until the restoring is finished.
235     future = CompletableFuture.runAsync(() -> start());
236     log.info("start of application completed");
237   }
238
239
240   @Override
241   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
242   {
243     partitionOwnershipUnknown = true;
244     log.info("partitions revoked: {}", partitions);
245     for (TopicPartition topicPartition : partitions)
246     {
247       int partition = topicPartition.partition();
248       long offset = consumer.position(topicPartition);
249       log.info("deactivating partition {}, offset: {}", partition, offset);
250       repository.deactivatePartition(partition, offset);
251     }
252   }
253
254   @Override
255   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
256   {
257     log.info("partitions assigned: {}", partitions);
258     fetchAssignmentsAsync();
259     if (partitions.size() > 0)
260     {
261       for (TopicPartition topicPartition : partitions)
262       {
263         int partition = topicPartition.partition();
264         long offset = repository.activatePartition(partition);
265         log.info("activated partition {}, seeking to offset {}", partition, offset);
266         consumer.seek(topicPartition, offset);
267       }
268
269       restore(partitions);
270     }
271   }
272
273   private void fetchAssignmentsAsync()
274   {
275     adminClient
276         .describeConsumerGroups(List.of(groupId))
277         .describedGroups()
278         .get(groupId)
279         .whenComplete((descriptions, e) ->
280         {
281           if (e != null)
282           {
283             log.error("could not fetch group data: {}", e.getMessage());
284           }
285           else
286           {
287             synchronized (this)
288             {
289               for (MemberDescription description : descriptions.members())
290               {
291                 description
292                     .assignment()
293                     .topicPartitions()
294                     .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
295               }
296               partitionOwnershipUnknown = false;
297               notifyAll();
298             }
299           }
300         });
301   }
302
303   @Override
304   public void onPartitionsLost(Collection<TopicPartition> partitions)
305   {
306     partitionOwnershipUnknown = true;
307     log.info("partiotions lost: {}", partitions);
308   }
309
310
311   private void restore(Collection<TopicPartition> partitions)
312   {
313     log.info("--> starting restore...");
314
315     Map<Integer, Long> lastSeen =
316         consumer
317             .endOffsets(partitions)
318             .entrySet()
319             .stream()
320             .collect(Collectors.toMap(
321                 entry -> entry.getKey().partition(),
322                 entry -> entry.getValue() - 1));
323
324     Map<Integer, Long> positions =
325         lastSeen
326             .keySet()
327             .stream()
328             .collect(Collectors.toMap(
329                 partition -> partition,
330                 partition -> repository.storedPosition(partition)));
331
332     while (
333         positions
334             .entrySet()
335             .stream()
336             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
337             .reduce(false, (a, b) -> a || b))
338     {
339       try
340       {
341         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
342         if (records.count() == 0)
343           continue;
344
345         log.debug("polled {} records", records.count());
346         records.forEach(record ->
347         {
348           handleRecord(record, restoreUseCases);
349           positions.put(record.partition(), record.offset());
350         });
351       }
352       catch(WakeupException e)
353       {
354         log.info("--> cleanly interrupted while restoring");
355       }
356     }
357
358     log.info("--> restore completed!");
359   }
360
361   @PostMapping("start")
362   public synchronized String start()
363   {
364     if (running)
365     {
366       log.info("consumer already running!");
367       return "Already running!";
368     }
369
370     int foundNumPartitions = consumer.partitionsFor(topic).size();
371     if (foundNumPartitions != numPartitions)
372     {
373       log.error(
374           "unexpected number of partitions for topic {}: expected={}, found={}",
375           topic,
376           numPartitions,
377           foundNumPartitions
378           );
379       return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
380     }
381
382     consumer.subscribe(List.of(topic), this);
383
384     running = true;
385     future = CompletableFuture.runAsync(this);
386
387     log.info("consumer started");
388     return "Started";
389   }
390
391   @PostMapping("stop")
392   public synchronized String stop()
393   {
394     if (!running)
395     {
396       log.info("consumer not running!");
397       return "Not running";
398     }
399
400     running = false;
401
402     if (!future.isDone())
403       consumer.wakeup();
404
405     log.info("waiting for the consumer...");
406     try
407     {
408       future.get();
409     }
410     catch (InterruptedException|ExecutionException e)
411     {
412       log.error("Exception while joining polling task!", e);
413       return e.getMessage();
414     }
415     finally
416     {
417       future = null;
418       consumer.unsubscribe();
419     }
420
421     log.info("consumer stopped");
422     return "Stopped";
423   }
424
425   public synchronized void shutdown()
426   {
427     log.info("shutdown initiated!");
428     shutdown = true;
429     stop();
430     log.info("closing consumer");
431     consumer.close();
432   }
433
434
435
436   public interface ConsumerUseCases
437       extends
438         GetTransferUseCase,
439         CreateTransferUseCase,
440         HandleStateChangeUseCase {};
441 }