1 package de.juplo.kafka.payment.transfer.adapter;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
6 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
7 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
8 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.admin.AdminClient;
11 import org.apache.kafka.clients.admin.MemberDescription;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.consumer.KafkaConsumer;
16 import org.apache.kafka.common.TopicPartition;
17 import org.apache.kafka.common.errors.WakeupException;
18 import org.springframework.context.event.ContextRefreshedEvent;
19 import org.springframework.context.event.EventListener;
20 import org.springframework.web.bind.annotation.PostMapping;
21 import org.springframework.web.bind.annotation.RequestMapping;
22 import org.springframework.web.bind.annotation.ResponseBody;
24 import java.time.Clock;
25 import java.time.Duration;
26 import java.time.Instant;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Future;
31 import java.util.stream.Collectors;
34 @RequestMapping("/consumer")
37 public class TransferConsumer implements Runnable, ConsumerRebalanceListener
39 private final String topic;
40 private final int numPartitions;
41 private final KafkaConsumer<String, String> consumer;
42 private final AdminClient adminClient;
43 private final TransferRepository repository;
44 private final ObjectMapper mapper;
45 private final ConsumerUseCases productionUseCases, restoreUseCases;
47 private boolean running = false;
48 private boolean shutdown = false;
49 private Future<?> future = null;
51 private final String groupId;
52 private final String groupInstanceId;
53 private final Map<String, String> instanceIdUriMapping;
54 private final String[] instanceIdByPartition;
57 private int stateStoreInterval;
59 private volatile boolean partitionOwnershipUnknown = true;
62 public TransferConsumer(
65 Map<String, String> instanceIdUriMapping,
66 KafkaConsumer<String, String> consumer,
67 AdminClient adminClient,
68 TransferRepository repository,
70 int stateStoreInterval,
72 ConsumerUseCases productionUseCases,
73 ConsumerUseCases restoreUseCases)
76 this.numPartitions = numPartitions;
77 this.groupId = consumer.groupMetadata().groupId();
78 this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
79 this.instanceIdByPartition = new String[numPartitions];
80 this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
81 for (String instanceId : instanceIdUriMapping.keySet())
83 // Requests are not redirected for the instance itself
84 String uri = instanceId.equals(groupInstanceId)
86 : instanceIdUriMapping.get(instanceId);
87 this.instanceIdUriMapping.put(instanceId, uri);
89 this.consumer = consumer;
90 this.adminClient = adminClient;
91 this.repository = repository;
93 this.stateStoreInterval = stateStoreInterval;
95 this.productionUseCases = productionUseCases;
96 this.restoreUseCases = restoreUseCases;
103 Instant stateStored = clock.instant();
109 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
110 if (records.count() == 0)
113 log.debug("polled {} records", records.count());
114 records.forEach(record -> handleRecord(record, productionUseCases));
116 Instant now = clock.instant();
118 stateStoreInterval > 0 &&
119 Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
121 Map<Integer, Long> offsets = new HashMap<>();
123 for (TopicPartition topicPartition : consumer.assignment())
125 Integer partition = topicPartition.partition();
126 Long offset = consumer.position(topicPartition);
127 log.info("storing state locally for {}/{}: {}", topic, partition, offset);
128 offsets.put(partition, offset);
131 repository.storeState(offsets);
135 catch (WakeupException e)
137 log.info("cleanly interrupted while polling");
141 log.info("polling stopped");
144 private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
148 byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
152 case EventType.NEW_TRANSFER:
154 NewTransferEvent newTransferEvent =
155 mapper.readValue(record.value(), NewTransferEvent.class);
158 newTransferEvent.getId(),
159 newTransferEvent.getPayer(),
160 newTransferEvent.getPayee(),
161 newTransferEvent.getAmount());
164 case EventType.TRANSFER_STATE_CHANGED:
166 TransferStateChangedEvent stateChangedEvent =
167 mapper.readValue(record.value(), TransferStateChangedEvent.class);
168 useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
172 catch (JsonProcessingException e)
175 "ignoring invalid json in message #{} on {}/{}: {}",
181 catch (IllegalArgumentException e)
184 "ignoring invalid message #{} on {}/{}: {}, message={}",
194 public Optional<String> uriForKey(String key)
198 while (partitionOwnershipUnknown)
200 try { wait(); } catch (InterruptedException e) {}
203 int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
206 .ofNullable(instanceIdByPartition[partition])
207 .map(id -> instanceIdUriMapping.get(id));
212 public synchronized void onApplicationEvent(ContextRefreshedEvent event)
214 // "Needed", because this method is called synchronously during the
215 // initialization pahse of Spring. If the subscription happens
216 // in the same thread, it would block the completion of the initialization.
217 // Hence, the app would not react to any signal (CTRL-C, for example) except
218 // a KILL until the restoring is finished.
219 future = CompletableFuture.runAsync(() -> start());
220 log.info("start of application completed");
225 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
227 partitionOwnershipUnknown = true;
228 log.info("partitions revoked: {}", partitions);
229 for (TopicPartition topicPartition : partitions)
231 int partition = topicPartition.partition();
232 long offset = consumer.position(topicPartition);
233 log.info("deactivating partition {}, offset: {}", partition, offset);
234 repository.deactivatePartition(partition, offset);
239 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
241 log.info("partitions assigned: {}", partitions);
242 fetchAssignmentsAsync();
243 if (partitions.size() > 0)
245 for (TopicPartition topicPartition : partitions)
247 int partition = topicPartition.partition();
248 long offset = repository.activatePartition(partition);
249 log.info("activated partition {}, seeking to offset {}", partition, offset);
250 consumer.seek(topicPartition, offset);
257 private void fetchAssignmentsAsync()
260 .describeConsumerGroups(List.of(groupId))
263 .whenComplete((descriptions, e) ->
267 log.error("could not fetch group data: {}", e.getMessage());
273 for (MemberDescription description : descriptions.members())
278 .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
280 partitionOwnershipUnknown = false;
288 public void onPartitionsLost(Collection<TopicPartition> partitions)
290 partitionOwnershipUnknown = true;
291 log.info("partiotions lost: {}", partitions);
295 private void restore(Collection<TopicPartition> partitions)
297 log.info("--> starting restore...");
299 Map<Integer, Long> lastSeen =
301 .endOffsets(partitions)
304 .collect(Collectors.toMap(
305 entry -> entry.getKey().partition(),
306 entry -> entry.getValue() - 1));
308 Map<Integer, Long> positions =
312 .collect(Collectors.toMap(
313 partition -> partition,
314 partition -> repository.storedPosition(partition)));
320 .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
321 .reduce(false, (a, b) -> a || b))
325 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
326 if (records.count() == 0)
329 log.debug("polled {} records", records.count());
330 records.forEach(record ->
332 handleRecord(record, restoreUseCases);
333 positions.put(record.partition(), record.offset());
336 catch(WakeupException e)
338 log.info("--> cleanly interrupted while restoring");
342 log.info("--> restore completed!");
345 @PostMapping("start")
346 public synchronized String start()
350 log.info("consumer already running!");
351 return "Already running!";
354 int foundNumPartitions = consumer.partitionsFor(topic).size();
355 if (foundNumPartitions != numPartitions)
358 "unexpected number of partitions for topic {}: expected={}, found={}",
363 return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
366 consumer.subscribe(List.of(topic), this);
369 future = CompletableFuture.runAsync(this);
371 log.info("consumer started");
376 public synchronized String stop()
380 log.info("consumer not running!");
381 return "Not running";
386 if (!future.isDone())
389 log.info("waiting for the consumer...");
394 catch (InterruptedException|ExecutionException e)
396 log.error("Exception while joining polling task!", e);
397 return e.getMessage();
402 consumer.unsubscribe();
405 log.info("consumer stopped");
409 public synchronized void shutdown()
411 log.info("shutdown initiated!");
414 log.info("closing consumer");
420 public interface ConsumerUseCases
423 CreateTransferUseCase,
424 HandleStateChangeUseCase {};