The state is periodically stored in a local file, that is read on startup
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
1 package de.juplo.kafka.payment.transfer.adapter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.admin.AdminClient;
11 import org.apache.kafka.clients.admin.MemberDescription;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.errors.WakeupException;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
23
24 import java.time.Clock;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.*;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.stream.Collectors;
32
33
34 @RequestMapping("/consumer")
35 @ResponseBody
36 @Slf4j
37 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
38 {
39   private final String topic;
40   private final int numPartitions;
41   private final KafkaConsumer<String, String> consumer;
42   private final AdminClient adminClient;
43   private final TransferRepository repository;
44   private final ObjectMapper mapper;
45   private final ConsumerUseCases productionUseCases, restoreUseCases;
46
47   private boolean running = false;
48   private boolean shutdown = false;
49   private Future<?> future = null;
50
51   private final String groupId;
52   private final String groupInstanceId;
53   private final Map<String, String> instanceIdUriMapping;
54   private final String[] instanceIdByPartition;
55
56   private Clock clock;
57   private int stateStoreInterval;
58
59   private volatile boolean partitionOwnershipUnknown = true;
60
61
62   public TransferConsumer(
63       String topic,
64       int numPartitions,
65       Map<String, String> instanceIdUriMapping,
66       KafkaConsumer<String, String> consumer,
67       AdminClient adminClient,
68       TransferRepository repository,
69       Clock clock,
70       int stateStoreInterval,
71       ObjectMapper mapper,
72       ConsumerUseCases productionUseCases,
73       ConsumerUseCases restoreUseCases)
74   {
75     this.topic = topic;
76     this.numPartitions = numPartitions;
77     this.groupId = consumer.groupMetadata().groupId();
78     this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
79     this.instanceIdByPartition = new String[numPartitions];
80     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
81     for (String instanceId : instanceIdUriMapping.keySet())
82     {
83       // Requests are not redirected for the instance itself
84       String uri = instanceId.equals(groupInstanceId)
85           ? null
86           : instanceIdUriMapping.get(instanceId);
87       this.instanceIdUriMapping.put(instanceId, uri);
88     }
89     this.consumer = consumer;
90     this.adminClient = adminClient;
91     this.repository = repository;
92     this.clock = clock;
93     this.stateStoreInterval = stateStoreInterval;
94     this.mapper = mapper;
95     this.productionUseCases = productionUseCases;
96     this.restoreUseCases = restoreUseCases;
97   }
98
99
100   @Override
101   public void run()
102   {
103     Instant stateStored = clock.instant();
104
105     while (running)
106     {
107       try
108       {
109         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
110         if (records.count() == 0)
111           continue;
112
113         log.debug("polled {} records", records.count());
114         records.forEach(record -> handleRecord(record, productionUseCases));
115
116         Instant now = clock.instant();
117         if (
118             stateStoreInterval > 0 &&
119             Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
120         {
121           Map<Integer, Long> offsets = new HashMap<>();
122
123           for (TopicPartition topicPartition : consumer.assignment())
124           {
125             Integer partition = topicPartition.partition();
126             Long offset = consumer.position(topicPartition);
127             log.info("storing state locally for {}/{}: {}", topic, partition, offset);
128             offsets.put(partition, offset);
129           }
130
131           repository.storeState(offsets);
132           stateStored = now;
133         }
134       }
135       catch (WakeupException e)
136       {
137         log.info("cleanly interrupted while polling");
138       }
139     }
140
141     log.info("polling stopped");
142   }
143
144   private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
145   {
146     try
147     {
148       byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
149
150       switch (eventType)
151       {
152         case EventType.NEW_TRANSFER:
153
154           NewTransferEvent newTransferEvent =
155               mapper.readValue(record.value(), NewTransferEvent.class);
156           useCases
157               .create(
158                   newTransferEvent.getId(),
159                   newTransferEvent.getPayer(),
160                   newTransferEvent.getPayee(),
161                   newTransferEvent.getAmount());
162           break;
163
164         case EventType.TRANSFER_STATE_CHANGED:
165
166           TransferStateChangedEvent stateChangedEvent =
167               mapper.readValue(record.value(), TransferStateChangedEvent.class);
168           useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
169           break;
170       }
171     }
172     catch (JsonProcessingException e)
173     {
174       log.error(
175           "ignoring invalid json in message #{} on {}/{}: {}",
176           record.offset(),
177           record.topic(),
178           record.partition(),
179           record.value());
180     }
181     catch (IllegalArgumentException e)
182     {
183       log.error(
184           "ignoring invalid message #{} on {}/{}: {}, message={}",
185           record.offset(),
186           record.topic(),
187           record.partition(),
188           e.getMessage(),
189           record.value());
190     }
191   }
192
193
194   public Optional<String> uriForKey(String key)
195   {
196     synchronized (this)
197     {
198       while (partitionOwnershipUnknown)
199       {
200         try { wait(); } catch (InterruptedException e) {}
201       }
202
203       int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
204       return
205           Optional
206               .ofNullable(instanceIdByPartition[partition])
207               .map(id -> instanceIdUriMapping.get(id));
208     }
209   }
210
211   @EventListener
212   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
213   {
214     // "Needed", because this method is called synchronously during the
215     // initialization pahse of Spring. If the subscription happens
216     // in the same thread, it would block the completion of the initialization.
217     // Hence, the app would not react to any signal (CTRL-C, for example) except
218     // a KILL until the restoring is finished.
219     future = CompletableFuture.runAsync(() -> start());
220     log.info("start of application completed");
221   }
222
223
224   @Override
225   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
226   {
227     partitionOwnershipUnknown = true;
228     log.info("partitions revoked: {}", partitions);
229     for (TopicPartition topicPartition : partitions)
230     {
231       int partition = topicPartition.partition();
232       long offset = consumer.position(topicPartition);
233       log.info("deactivating partition {}, offset: {}", partition, offset);
234       repository.deactivatePartition(partition, offset);
235     }
236   }
237
238   @Override
239   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
240   {
241     log.info("partitions assigned: {}", partitions);
242     fetchAssignmentsAsync();
243     if (partitions.size() > 0)
244     {
245       for (TopicPartition topicPartition : partitions)
246       {
247         int partition = topicPartition.partition();
248         long offset = repository.activatePartition(partition);
249         log.info("activated partition {}, seeking to offset {}", partition, offset);
250         consumer.seek(topicPartition, offset);
251       }
252
253       restore(partitions);
254     }
255   }
256
257   private void fetchAssignmentsAsync()
258   {
259     adminClient
260         .describeConsumerGroups(List.of(groupId))
261         .describedGroups()
262         .get(groupId)
263         .whenComplete((descriptions, e) ->
264         {
265           if (e != null)
266           {
267             log.error("could not fetch group data: {}", e.getMessage());
268           }
269           else
270           {
271             synchronized (this)
272             {
273               for (MemberDescription description : descriptions.members())
274               {
275                 description
276                     .assignment()
277                     .topicPartitions()
278                     .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
279               }
280               partitionOwnershipUnknown = false;
281               notifyAll();
282             }
283           }
284         });
285   }
286
287   @Override
288   public void onPartitionsLost(Collection<TopicPartition> partitions)
289   {
290     partitionOwnershipUnknown = true;
291     log.info("partiotions lost: {}", partitions);
292   }
293
294
295   private void restore(Collection<TopicPartition> partitions)
296   {
297     log.info("--> starting restore...");
298
299     Map<Integer, Long> lastSeen =
300         consumer
301             .endOffsets(partitions)
302             .entrySet()
303             .stream()
304             .collect(Collectors.toMap(
305                 entry -> entry.getKey().partition(),
306                 entry -> entry.getValue() - 1));
307
308     Map<Integer, Long> positions =
309         lastSeen
310             .keySet()
311             .stream()
312             .collect(Collectors.toMap(
313                 partition -> partition,
314                 partition -> repository.storedPosition(partition)));
315
316     while (
317         positions
318             .entrySet()
319             .stream()
320             .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
321             .reduce(false, (a, b) -> a || b))
322     {
323       try
324       {
325         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
326         if (records.count() == 0)
327           continue;
328
329         log.debug("polled {} records", records.count());
330         records.forEach(record ->
331         {
332           handleRecord(record, restoreUseCases);
333           positions.put(record.partition(), record.offset());
334         });
335       }
336       catch(WakeupException e)
337       {
338         log.info("--> cleanly interrupted while restoring");
339       }
340     }
341
342     log.info("--> restore completed!");
343   }
344
345   @PostMapping("start")
346   public synchronized String start()
347   {
348     if (running)
349     {
350       log.info("consumer already running!");
351       return "Already running!";
352     }
353
354     int foundNumPartitions = consumer.partitionsFor(topic).size();
355     if (foundNumPartitions != numPartitions)
356     {
357       log.error(
358           "unexpected number of partitions for topic {}: expected={}, found={}",
359           topic,
360           numPartitions,
361           foundNumPartitions
362           );
363       return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
364     }
365
366     consumer.subscribe(List.of(topic), this);
367
368     running = true;
369     future = CompletableFuture.runAsync(this);
370
371     log.info("consumer started");
372     return "Started";
373   }
374
375   @PostMapping("stop")
376   public synchronized String stop()
377   {
378     if (!running)
379     {
380       log.info("consumer not running!");
381       return "Not running";
382     }
383
384     running = false;
385
386     if (!future.isDone())
387       consumer.wakeup();
388
389     log.info("waiting for the consumer...");
390     try
391     {
392       future.get();
393     }
394     catch (InterruptedException|ExecutionException e)
395     {
396       log.error("Exception while joining polling task!", e);
397       return e.getMessage();
398     }
399     finally
400     {
401       future = null;
402       consumer.unsubscribe();
403     }
404
405     log.info("consumer stopped");
406     return "Stopped";
407   }
408
409   public synchronized void shutdown()
410   {
411     log.info("shutdown initiated!");
412     shutdown = true;
413     stop();
414     log.info("closing consumer");
415     consumer.close();
416   }
417
418
419
420   public interface ConsumerUseCases
421       extends
422         GetTransferUseCase,
423         CreateTransferUseCase,
424         HandleStateChangeUseCase {};
425 }