WIP: instance-mapping from assignor
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
8 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
9 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.AdminClient;
12 import org.apache.kafka.clients.admin.MemberDescription;
13 import org.apache.kafka.clients.consumer.*;
14 import org.apache.kafka.common.Cluster;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import org.apache.kafka.common.serialization.StringDeserializer;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
23
24 import java.nio.ByteBuffer;
25 import java.time.Clock;
26 import java.time.Duration;
27 import java.time.Instant;
28 import java.util.*;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.Future;
32 import java.util.stream.Collectors;
33
34
35 @RequestMapping("/consumer")
36 @ResponseBody
37 @Slf4j
38 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
39 {
40   private final String topic;
41   private final int numPartitions;
42   private final KafkaConsumer<String, String> consumer;
43   private final AdminClient adminClient;
44   private final TransferRepository repository;
45   private final ObjectMapper mapper;
46   private final ConsumerUseCases productionUseCases, restoreUseCases;
47
48   private boolean running = false;
49   private boolean shutdown = false;
50   private Future<?> future = null;
51
52   private final String groupId;
53   private final String groupInstanceId;
54   private final Map<String, String> instanceIdUriMapping;
55   private final String[] instanceIdByPartition;
56
57   private Clock clock;
58   private int stateStoreInterval;
59
60   private volatile boolean partitionOwnershipUnknown = true;
61
62
63   public TransferConsumer(
64       String bootstrapServers,
65       String groupId,
66       String groupInstanceId,
67       String topic,
68       int numPartitions,
69       Map<String, String> instanceIdUriMapping,
70       AdminClient adminClient,
71       TransferRepository repository,
72       Clock clock,
73       int stateStoreInterval,
74       ObjectMapper mapper,
75       ConsumerUseCases productionUseCases,
76       ConsumerUseCases restoreUseCases)
77   {
78     Properties props = new Properties();
79     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
80     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
81     props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
82     props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
83     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
84     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
85     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
86     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
87
88     this.consumer = new KafkaConsumer<>(props).;
89
90     this.groupId = groupId;
91     this.groupInstanceId = groupInstanceId;
92     this.topic = topic;
93     this.numPartitions = numPartitions;
94     this.instanceIdByPartition = new String[numPartitions];
95     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
96     for (String instanceId : instanceIdUriMapping.keySet())
97     {
98       // Requests are not redirected for the instance itself
99       String uri = instanceId.equals(groupInstanceId)
100           ? null
101           : instanceIdUriMapping.get(instanceId);
102       this.instanceIdUriMapping.put(instanceId, uri);
103     }
104     this.adminClient = adminClient;
105     this.repository = repository;
106     this.clock = clock;
107     this.stateStoreInterval = stateStoreInterval;
108     this.mapper = mapper;
109     this.productionUseCases = productionUseCases;
110     this.restoreUseCases = restoreUseCases;
111   }
112
113
114   @Override
115   public void run()
116   {
117     Instant stateStored = clock.instant();
118
119     while (running)
120     {
121       try
122       {
123         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
124         if (records.count() == 0)
125           continue;
126
127         log.debug("polled {} records", records.count());
128         records.forEach(record -> handleRecord(record, productionUseCases));
129
130         Instant now = clock.instant();
131         if (
132             stateStoreInterval > 0 &&
133             Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
134         {
135           Map<Integer, Long> offsets = new HashMap<>();
136
137           for (TopicPartition topicPartition : consumer.assignment())
138           {
139             Integer partition = topicPartition.partition();
140             Long offset = consumer.position(topicPartition);
141             log.info("storing state locally for {}/{}: {}", topic, partition, offset);
142             offsets.put(partition, offset);
143           }
144
145           repository.storeState(offsets);
146           stateStored = now;
147         }
148       }
149       catch (WakeupException e)
150       {
151         log.info("cleanly interrupted while polling");
152       }
153     }
154
155     log.info("polling stopped");
156   }
157
158   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
159   {
160     try
161     {
162       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
163
164       switch (eventType)
165       {
166         case EventType.NEW_TRANSFER:
167
168           NewTransferEvent newTransferEvent =
169               mapper.readValue(record.value(), NewTransferEvent.class);
170           useCases
171               .create(
172                   newTransferEvent.getId(),
173                   newTransferEvent.getPayer(),
174                   newTransferEvent.getPayee(),
175                   newTransferEvent.getAmount());
176           break;
177
178         case EventType.TRANSFER_STATE_CHANGED:
179
180           TransferStateChangedEvent stateChangedEvent =
181               mapper.readValue(record.value(), TransferStateChangedEvent.class);
182           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
183           break;
184       }
185     }
186     catch (JsonProcessingException e)
187     {
188       log.error(
189           "ignoring invalid json in message #{} on {}/{}: {}",
190           record.offset(),
191           record.topic(),
192           record.partition(),
193           record.value());
194     }
195     catch (IllegalArgumentException e)
196     {
197       log.error(
198           "ignoring invalid message #{} on {}/{}: {}, message={}",
199           record.offset(),
200           record.topic(),
201           record.partition(),
202           e.getMessage(),
203           record.value());
204     }
205   }
206
207
208   /**
209    * Identifies the URI, at which the Group-Instance can be reached,
210    * that holds the state for a specific {@link Transfer}.
211    *
212    * The {@link Transfer#getId() ID} of the {@link Transfer} is named
213    * {@code key} here and of type {@code String}, because this example
214    * project stores the key as a String in Kafka to simplify the listing
215    * and manual manipulation of the according topic.
216    *
217    * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
218    * @return An {@link Optional}, that holds the URI at which the Group-Instance
219    * can be reached, that holds the state for the {@link Transfer}, that
220    * is identified by the key (if present), or is empty, if the {@link Transfer}
221    * would be handled by the local instance.
222    */
223   public Optional<String> uriForKey(String key)
224   {
225     synchronized (this)
226     {
227       while (partitionOwnershipUnknown)
228       {
229         try { wait(); } catch (InterruptedException e) {}
230       }
231
232       int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
233       return
234           Optional
235               .ofNullable(instanceIdByPartition[partition])
236               .map(id -> instanceIdUriMapping.get(id));
237     }
238   }
239
240   @EventListener
241   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
242   {
243     // "Needed", because this method is called synchronously during the
244     // initialization pahse of Spring. If the subscription happens
245     // in the same thread, it would block the completion of the initialization.
246     // Hence, the app would not react to any signal (CTRL-C, for example) except
247     // a KILL until the restoring is finished.
248     future = CompletableFuture.runAsync(() -> start());
249     log.info("start of application completed");
250   }
251
252
253   @Override
254   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
255   {
256     partitionOwnershipUnknown = true;
257     log.info("partitions revoked: {}", partitions);
258     for (TopicPartition topicPartition : partitions)
259     {
260       int partition = topicPartition.partition();
261       long offset = consumer.position(topicPartition);
262       log.info("deactivating partition {}, offset: {}", partition, offset);
263       repository.deactivatePartition(partition, offset);
264     }
265   }
266
267   @Override
268   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
269   {
270     log.info("partitions assigned: {}", partitions);
271     fetchAssignmentsAsync();
272     if (partitions.size() > 0)
273     {
274       for (TopicPartition topicPartition : partitions)
275       {
276         int partition = topicPartition.partition();
277         long offset = repository.activatePartition(partition);
278         log.info("activated partition {}, seeking to offset {}", partition, offset);
279         consumer.seek(topicPartition, offset);
280       }
281
282       restore(partitions);
283     }
284   }
285
286   private void fetchAssignmentsAsync()
287   {
288     adminClient
289         .describeConsumerGroups(List.of(groupId))
290         .describedGroups()
291         .get(groupId)
292         .whenComplete((descriptions, e) ->
293         {
294           if (e != null)
295           {
296             log.error("could not fetch group data: {}", e.getMessage());
297           }
298           else
299           {
300             synchronized (this)
301             {
302               for (MemberDescription description : descriptions.members())
303               {
304                 description
305                     .assignment()
306                     .topicPartitions()
307                     .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
308               }
309               partitionOwnershipUnknown = false;
310               notifyAll();
311             }
312           }
313         });
314   }
315
316   @Override
317   public void onPartitionsLost(Collection<TopicPartition> partitions)
318   {
319     partitionOwnershipUnknown = true;
320     log.info("partiotions lost: {}", partitions);
321   }
322
323
324   private void restore(Collection<TopicPartition> partitions)
325   {
326     log.info("--> starting restore...");
327
328     Map<Integer, Long> lastSeen =
329         consumer
330             .endOffsets(partitions)
331             .entrySet()
332             .stream()
333             .collect(Collectors.toMap(
334                 entry -> entry.getKey().partition(),
335                 entry -> entry.getValue() - 1));
336
337     Map<Integer, Long> positions =
338         lastSeen
339             .keySet()
340             .stream()
341             .collect(Collectors.toMap(
342                 partition -> partition,
343                 partition -> repository.storedPosition(partition)));
344
345     while (
346         positions
347             .entrySet()
348             .stream()
349             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
350             .reduce(false, (a, b) -> a || b))
351     {
352       try
353       {
354         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
355         if (records.count() == 0)
356           continue;
357
358         log.debug("polled {} records", records.count());
359         records.forEach(record ->
360         {
361           handleRecord(record, restoreUseCases);
362           positions.put(record.partition(), record.offset());
363         });
364       }
365       catch(WakeupException e)
366       {
367         log.info("--> cleanly interrupted while restoring");
368       }
369     }
370
371     log.info("--> restore completed!");
372   }
373
374   @PostMapping("start")
375   public synchronized String start()
376   {
377     if (running)
378     {
379       log.info("consumer already running!");
380       return "Already running!";
381     }
382
383     int foundNumPartitions = consumer.partitionsFor(topic).size();
384     if (foundNumPartitions != numPartitions)
385     {
386       log.error(
387           "unexpected number of partitions for topic {}: expected={}, found={}",
388           topic,
389           numPartitions,
390           foundNumPartitions
391           );
392       return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
393     }
394
395     consumer.subscribe(List.of(topic), this);
396
397     running = true;
398     future = CompletableFuture.runAsync(this);
399
400     log.info("consumer started");
401     return "Started";
402   }
403
404   @PostMapping("stop")
405   public synchronized String stop()
406   {
407     if (!running)
408     {
409       log.info("consumer not running!");
410       return "Not running";
411     }
412
413     running = false;
414
415     if (!future.isDone())
416       consumer.wakeup();
417
418     log.info("waiting for the consumer...");
419     try
420     {
421       future.get();
422     }
423     catch (InterruptedException|ExecutionException e)
424     {
425       log.error("Exception while joining polling task!", e);
426       return e.getMessage();
427     }
428     finally
429     {
430       future = null;
431       consumer.unsubscribe();
432     }
433
434     log.info("consumer stopped");
435     return "Stopped";
436   }
437
438   public synchronized void shutdown()
439   {
440     log.info("shutdown initiated!");
441     shutdown = true;
442     stop();
443     log.info("closing consumer");
444     consumer.close();
445   }
446
447
448
449   public interface ConsumerUseCases
450       extends
451         GetTransferUseCase,
452         CreateTransferUseCase,
453         HandleStateChangeUseCase {};
454
455   public class Assignor extends CooperativeStickyAssignor
456   {
457     @Override
458     public GroupAssignment assign(
459         Cluster metadata,
460         GroupSubscription groupSubscription)
461     {
462       return super.assign(metadata, groupSubscription);
463     }
464
465     @Override
466     public ByteBuffer subscriptionUserData(Set<String> topics)
467     {
468       return null;
469     }
470
471     @Override
472     public void onAssignment(
473         Assignment assignment,
474         ConsumerGroupMetadata metadata)
475     {
476       log.info("New assignment: {}, {}", assignment, metadata);
477     }
478   }
479 }