+ private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
+ {
+ try
+ {
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ useCases
+ .create(
+ newTransferEvent.getId(),
+ newTransferEvent.getPayer(),
+ newTransferEvent.getPayee(),
+ newTransferEvent.getAmount());
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ useCases.handleStateChange(stateChangedEvent);
+ break;
+ }
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error(
+ "ignoring invalid json in message #{} on {}/{}: {}",
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.value());
+ }
+ catch (IllegalArgumentException e)
+ {
+ log.error(
+ "ignoring invalid message #{} on {}/{}: {}, message={}",
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ e.getMessage(),
+ record.value());
+ }
+ }
+
+
+ /**
+ * Identifies the URI, at which the Group-Instance can be reached,
+ * that holds the state for a specific {@link Transfer}.
+ *
+ * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+ * {@code key} here and of type {@code String}, because this example
+ * project stores the key as a String in Kafka to simplify the listing
+ * and manual manipulation of the according topic.
+ *
+ * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+ * @return An {@link Optional}, that holds the URI at which the Group-Instance
+ * can be reached, that holds the state for the {@link Transfer}, that
+ * is identified by the key (if present), or is empty, if the {@link Transfer}
+ * would be handled by the local instance.
+ */
+ public Optional<String> uriForKey(String key)
+ {
+ synchronized (this)
+ {
+ while (partitionOwnershipUnknown)
+ {
+ try { wait(); } catch (InterruptedException e) {}
+ }
+
+ int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+ return
+ Optional
+ .ofNullable(instanceIdByPartition[partition])
+ .map(id -> instanceIdUriMapping.get(id));
+ }
+ }
+
+ @EventListener
+ public synchronized void onApplicationEvent(ContextRefreshedEvent event)
+ {
+ // "Needed", because this method is called synchronously during the
+ // initialization pahse of Spring. If the subscription happens
+ // in the same thread, it would block the completion of the initialization.
+ // Hence, the app would not react to any signal (CTRL-C, for example) except
+ // a KILL until the restoring is finished.
+ future = CompletableFuture.runAsync(() -> start());
+ log.info("start of application completed");
+ }
+
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitionOwnershipUnknown = true;
+ log.info("partitions revoked: {}", partitions);
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = consumer.position(topicPartition);
+ log.info("deactivating partition {}, offset: {}", partition, offset);
+ repository.deactivatePartition(partition, offset);
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("partitions assigned: {}", partitions);
+ fetchAssignmentsAsync();
+ if (partitions.size() > 0)
+ {
+ for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
+ {
+ TopicPartition topicPartition = entry.getKey();
+ Integer partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ Long endOffset = entry.getValue();
+ if (offset < endOffset)
+ {
+ log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+ recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+ }
+ else
+ {
+ log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+ recordHandlers[partition] = productionRecordHandler;
+ }
+ }
+ }
+ }
+
+ private void fetchAssignmentsAsync()
+ {
+ adminClient
+ .describeConsumerGroups(List.of(groupId))
+ .describedGroups()
+ .get(groupId)
+ .whenComplete((descriptions, e) ->
+ {
+ if (e != null)
+ {
+ log.error("could not fetch group data: {}", e.getMessage());
+ }
+ else
+ {
+ synchronized (this)
+ {
+ for (MemberDescription description : descriptions.members())
+ {
+ description
+ .assignment()
+ .topicPartitions()
+ .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+ }
+ partitionOwnershipUnknown = false;
+ notifyAll();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ partitionOwnershipUnknown = true;
+ log.info("partiotions lost: {}", partitions);
+ }
+
+
+ class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
+ {
+ final long seen;
+
+
+ RestoreRecordHandler(Long endOffset)
+ {
+ this.seen = endOffset - 1;
+ }
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ if (seen < record.offset())
+ {
+ int partition = record.partition();
+ log.info(
+ "--> restore of partition {} completed: needed={}, seen={}!",
+ partition,
+ seen,
+ record.offset());
+ recordHandlers[partition] = productionRecordHandler;
+ productionRecordHandler.accept(record);
+ }
+ else
+ {
+ handleRecord(record, restoreUseCases);
+ if (seen == record.offset())
+ {
+ int partition = record.partition();
+ log.info( "--> restore of partition {} completed!", partition);
+ recordHandlers[partition] = productionRecordHandler;
+ }
+ }
+ }
+ }
+