+ private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
+ {
+ try
+ {
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ useCases.create(newTransferEvent.toTransfer().setState(CREATED));
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ useCases
+ .get(stateChangedEvent.getId())
+ .ifPresentOrElse(
+ transfer -> useCases.handle(transfer.setState(stateChangedEvent.getState())),
+ () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+ break;
+ }
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error(
+ "ignoring invalid json in message #{} on {}/{}: {}",
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value());
+ }
+ }
+
+ @EventListener
+ public synchronized void onApplicationEvent(ContextRefreshedEvent event)
+ {
+ // Needed, because this method is called synchronously during the
+ // initialization pahse of Spring. If the restoring is processed
+ // in the same thread, it would block the completion of the initialization.
+ // Hence, the app would not react to any signal (CTRL-C, for example) except
+ // a KILL until the restoring is finished.
+ future = executorService.submit(() -> restore());
+ }
+
+ private void restore()
+ {
+ log.info("--> starting restore...");
+
+ List<TopicPartition> partitions =
+ consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(info -> new TopicPartition(topic, info.partition()))
+ .collect(Collectors.toList());
+
+ Map<Integer, Long> lastSeen =
+ consumer
+ .endOffsets(partitions)
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey().partition(),
+ entry -> entry.getValue() - 1));
+
+ Map<Integer, Long> positions =
+ lastSeen
+ .keySet()
+ .stream()
+ .collect(Collectors.toMap(
+ partition -> partition,
+ partition -> 0l));
+
+ log.info("assigning {}}", partitions);
+ consumer.assign(partitions);
+
+ while (
+ restoring &&
+ positions
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
+ .reduce(false, (a, b) -> a || b))
+ {
+ try
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ if (records.count() == 0)
+ continue;
+
+ log.debug("polled {} records", records.count());
+ records.forEach(record ->
+ {
+ handleRecord(record, restoreUseCases);
+ positions.put(record.partition(), record.offset());
+ });
+ }
+ catch(WakeupException e)
+ {
+ log.info("--> cleanly interrupted while restoring");
+ return;
+ }
+ }
+
+ log.info("--> restore completed!");
+ restoring = false;
+
+ // We are intentionally _not_ unsubscribing here, since that would
+ // reset the offset to _earliest_, because we disabled offset-commits.
+
+ start();
+ }