The restore-process no longer happens inside onPartitionsAssigned()
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.MemberDescription;
13 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.clients.consumer.KafkaConsumer;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.errors.WakeupException;
19 import org.springframework.context.event.ContextRefreshedEvent;
20 import org.springframework.context.event.EventListener;
21 import org.springframework.web.bind.annotation.PostMapping;
22 import org.springframework.web.bind.annotation.RequestMapping;
23 import org.springframework.web.bind.annotation.ResponseBody;
24
25 import java.time.Clock;
26 import java.time.Duration;
27 import java.time.Instant;
28 import java.util.*;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import java.util.function.Consumer;
33
34
35 @RequestMapping("/consumer")
36 @ResponseBody
37 @Slf4j
38 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
39 {
40   private final String topic;
41   private final int numPartitions;
42   private final KafkaConsumer<String, String> consumer;
43   private final AdminClient adminClient;
44   private final TransferRepository repository;
45   private final ObjectMapper mapper;
46   private final ConsumerUseCases restoreUseCases;
47
48   private boolean running = false;
49   private boolean shutdown = false;
50   private Future<?> future = null;
51
52   private final String groupId;
53   private final String groupInstanceId;
54   private final Map<String, String> instanceIdUriMapping;
55   private final String[] instanceIdByPartition;
56
57
58   private Clock clock;
59   private int stateStoreInterval;
60
61   private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
62   private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
63
64   private volatile boolean partitionOwnershipUnknown = true;
65
66
67   public TransferConsumer(
68       String topic,
69       int numPartitions,
70       Map<String, String> instanceIdUriMapping,
71       KafkaConsumer<String, String> consumer,
72       AdminClient adminClient,
73       TransferRepository repository,
74       Clock clock,
75       int stateStoreInterval,
76       ObjectMapper mapper,
77       ConsumerUseCases productionUseCases,
78       ConsumerUseCases restoreUseCases)
79   {
80     this.topic = topic;
81     this.numPartitions = numPartitions;
82     this.groupId = consumer.groupMetadata().groupId();
83     this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
84     this.instanceIdByPartition = new String[numPartitions];
85     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
86     for (String instanceId : instanceIdUriMapping.keySet())
87     {
88       // Requests are not redirected for the instance itself
89       String uri = instanceId.equals(groupInstanceId)
90           ? null
91           : instanceIdUriMapping.get(instanceId);
92       this.instanceIdUriMapping.put(instanceId, uri);
93     }
94     this.consumer = consumer;
95     this.adminClient = adminClient;
96     this.repository = repository;
97     this.clock = clock;
98     this.stateStoreInterval = stateStoreInterval;
99     this.mapper = mapper;
100     this.restoreUseCases = restoreUseCases;
101
102     productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
103     this.recordHandlers = new Consumer[numPartitions];
104   }
105
106
107   @Override
108   public void run()
109   {
110     Instant stateStored = clock.instant();
111
112     while (running)
113     {
114       try
115       {
116         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
117         if (records.count() == 0)
118           continue;
119
120         log.debug("polled {} records", records.count());
121         records.forEach(record -> recordHandlers[record.partition()].accept(record));
122
123         Instant now = clock.instant();
124         if (
125             stateStoreInterval > 0 &&
126             Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
127         {
128           Map<Integer, Long> offsets = new HashMap<>();
129
130           for (TopicPartition topicPartition : consumer.assignment())
131           {
132             Integer partition = topicPartition.partition();
133             Long offset = consumer.position(topicPartition);
134             log.info("storing state locally for {}/{}: {}", topic, partition, offset);
135             offsets.put(partition, offset);
136           }
137
138           repository.storeState(offsets);
139           stateStored = now;
140         }
141       }
142       catch (WakeupException e)
143       {
144         log.info("cleanly interrupted while polling");
145       }
146     }
147
148     log.info("polling stopped");
149   }
150
151   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
152   {
153     try
154     {
155       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
156
157       switch (eventType)
158       {
159         case EventType.NEW_TRANSFER:
160
161           NewTransferEvent newTransferEvent =
162               mapper.readValue(record.value(), NewTransferEvent.class);
163           useCases
164               .create(
165                   newTransferEvent.getId(),
166                   newTransferEvent.getPayer(),
167                   newTransferEvent.getPayee(),
168                   newTransferEvent.getAmount());
169           break;
170
171         case EventType.TRANSFER_STATE_CHANGED:
172
173           TransferStateChangedEvent stateChangedEvent =
174               mapper.readValue(record.value(), TransferStateChangedEvent.class);
175           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
176           break;
177       }
178     }
179     catch (JsonProcessingException e)
180     {
181       log.error(
182           "ignoring invalid json in message #{} on {}/{}: {}",
183           record.offset(),
184           record.topic(),
185           record.partition(),
186           record.value());
187     }
188     catch (IllegalArgumentException e)
189     {
190       log.error(
191           "ignoring invalid message #{} on {}/{}: {}, message={}",
192           record.offset(),
193           record.topic(),
194           record.partition(),
195           e.getMessage(),
196           record.value());
197     }
198   }
199
200
201   /**
202    * Identifies the URI, at which the Group-Instance can be reached,
203    * that holds the state for a specific {@link Transfer}.
204    *
205    * The {@link Transfer#getId() ID} of the {@link Transfer} is named
206    * {@code key} here and of type {@code String}, because this example
207    * project stores the key as a String in Kafka to simplify the listing
208    * and manual manipulation of the according topic.
209    *
210    * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
211    * @return An {@link Optional}, that holds the URI at which the Group-Instance
212    * can be reached, that holds the state for the {@link Transfer}, that
213    * is identified by the key (if present), or is empty, if the {@link Transfer}
214    * would be handled by the local instance.
215    */
216   public Optional<String> uriForKey(String key)
217   {
218     synchronized (this)
219     {
220       while (partitionOwnershipUnknown)
221       {
222         try { wait(); } catch (InterruptedException e) {}
223       }
224
225       int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
226       return
227           Optional
228               .ofNullable(instanceIdByPartition[partition])
229               .map(id -> instanceIdUriMapping.get(id));
230     }
231   }
232
233   @EventListener
234   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
235   {
236     // "Needed", because this method is called synchronously during the
237     // initialization pahse of Spring. If the subscription happens
238     // in the same thread, it would block the completion of the initialization.
239     // Hence, the app would not react to any signal (CTRL-C, for example) except
240     // a KILL until the restoring is finished.
241     future = CompletableFuture.runAsync(() -> start());
242     log.info("start of application completed");
243   }
244
245
246   @Override
247   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
248   {
249     partitionOwnershipUnknown = true;
250     log.info("partitions revoked: {}", partitions);
251     for (TopicPartition topicPartition : partitions)
252     {
253       int partition = topicPartition.partition();
254       long offset = consumer.position(topicPartition);
255       log.info("deactivating partition {}, offset: {}", partition, offset);
256       repository.deactivatePartition(partition, offset);
257     }
258   }
259
260   @Override
261   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
262   {
263     log.info("partitions assigned: {}", partitions);
264     fetchAssignmentsAsync();
265     if (partitions.size() > 0)
266     {
267       for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
268       {
269         TopicPartition topicPartition = entry.getKey();
270         Integer partition = topicPartition.partition();
271         long offset = repository.activatePartition(partition);
272         log.info("activated partition {}, seeking to offset {}", partition, offset);
273         consumer.seek(topicPartition, offset);
274         Long endOffset = entry.getValue();
275         if (offset < endOffset)
276         {
277           log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
278           recordHandlers[partition] = new RestoreRecordHandler(endOffset);
279         }
280         else
281         {
282           log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
283           recordHandlers[partition] = productionRecordHandler;
284         }
285       }
286     }
287   }
288
289   private void fetchAssignmentsAsync()
290   {
291     adminClient
292         .describeConsumerGroups(List.of(groupId))
293         .describedGroups()
294         .get(groupId)
295         .whenComplete((descriptions, e) ->
296         {
297           if (e != null)
298           {
299             log.error("could not fetch group data: {}", e.getMessage());
300           }
301           else
302           {
303             synchronized (this)
304             {
305               for (MemberDescription description : descriptions.members())
306               {
307                 description
308                     .assignment()
309                     .topicPartitions()
310                     .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
311               }
312               partitionOwnershipUnknown = false;
313               notifyAll();
314             }
315           }
316         });
317   }
318
319   @Override
320   public void onPartitionsLost(Collection<TopicPartition> partitions)
321   {
322     partitionOwnershipUnknown = true;
323     log.info("partiotions lost: {}", partitions);
324   }
325
326
327   class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
328   {
329     final long seen;
330
331
332     RestoreRecordHandler(Long endOffset)
333     {
334       this.seen = endOffset - 1;
335     }
336
337
338     @Override
339     public void accept(ConsumerRecord<String, String> record)
340     {
341       if (seen < record.offset())
342       {
343         int partition = record.partition();
344         log.info(
345             "--> restore of partition {} completed: needed={}, seen={}!",
346             partition,
347             seen,
348             record.offset());
349         recordHandlers[partition] = productionRecordHandler;
350         productionRecordHandler.accept(record);
351       }
352       else
353       {
354         handleRecord(record, restoreUseCases);
355         if (seen == record.offset())
356         {
357           int partition = record.partition();
358           log.info( "--> restore of partition {} completed!", partition);
359           recordHandlers[partition] = productionRecordHandler;
360         }
361       }
362     }
363   }
364
365
366   @PostMapping("start")
367   public synchronized String start()
368   {
369     if (running)
370     {
371       log.info("consumer already running!");
372       return "Already running!";
373     }
374
375     int foundNumPartitions = consumer.partitionsFor(topic).size();
376     if (foundNumPartitions != numPartitions)
377     {
378       log.error(
379           "unexpected number of partitions for topic {}: expected={}, found={}",
380           topic,
381           numPartitions,
382           foundNumPartitions
383           );
384       return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
385     }
386
387     consumer.subscribe(List.of(topic), this);
388
389     running = true;
390     future = CompletableFuture.runAsync(this);
391
392     log.info("consumer started");
393     return "Started";
394   }
395
396   @PostMapping("stop")
397   public synchronized String stop()
398   {
399     if (!running)
400     {
401       log.info("consumer not running!");
402       return "Not running";
403     }
404
405     running = false;
406
407     if (!future.isDone())
408       consumer.wakeup();
409
410     log.info("waiting for the consumer...");
411     try
412     {
413       future.get();
414     }
415     catch (InterruptedException|ExecutionException e)
416     {
417       log.error("Exception while joining polling task!", e);
418       return e.getMessage();
419     }
420     finally
421     {
422       future = null;
423       consumer.unsubscribe();
424     }
425
426     log.info("consumer stopped");
427     return "Stopped";
428   }
429
430   public synchronized void shutdown()
431   {
432     log.info("shutdown initiated!");
433     shutdown = true;
434     stop();
435     log.info("closing consumer");
436     consumer.close();
437   }
438
439
440
441   public interface ConsumerUseCases
442       extends
443         GetTransferUseCase,
444         CreateTransferUseCase,
445         HandleStateChangeUseCase {};
446 }