import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
@RequestMapping("/consumer")
private final AdminClient adminClient;
private final TransferRepository repository;
private final ObjectMapper mapper;
- private final ConsumerUseCases productionUseCases, restoreUseCases;
+ private final ConsumerUseCases restoreUseCases;
private boolean running = false;
private boolean shutdown = false;
private final Map<String, String> instanceIdUriMapping;
private final String[] instanceIdByPartition;
+
private Clock clock;
private int stateStoreInterval;
+ private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
+ private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
+
private volatile boolean partitionOwnershipUnknown = true;
this.clock = clock;
this.stateStoreInterval = stateStoreInterval;
this.mapper = mapper;
- this.productionUseCases = productionUseCases;
this.restoreUseCases = restoreUseCases;
+
+ productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
+ this.recordHandlers = new Consumer[numPartitions];
}
continue;
log.debug("polled {} records", records.count());
- records.forEach(record -> handleRecord(record, productionUseCases));
+ records.forEach(record -> recordHandlers[record.partition()].accept(record));
Instant now = clock.instant();
if (
fetchAssignmentsAsync();
if (partitions.size() > 0)
{
- for (TopicPartition topicPartition : partitions)
+ for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
{
- int partition = topicPartition.partition();
+ TopicPartition topicPartition = entry.getKey();
+ Integer partition = topicPartition.partition();
long offset = repository.activatePartition(partition);
log.info("activated partition {}, seeking to offset {}", partition, offset);
consumer.seek(topicPartition, offset);
+ Long endOffset = entry.getValue();
+ if (offset < endOffset)
+ {
+ log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+ recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+ }
+ else
+ {
+ log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+ recordHandlers[partition] = productionRecordHandler;
+ }
}
-
- restore(partitions);
}
}
}
- private void restore(Collection<TopicPartition> partitions)
+ class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
{
- log.info("--> starting restore...");
-
- Map<Integer, Long> lastSeen =
- consumer
- .endOffsets(partitions)
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey().partition(),
- entry -> entry.getValue() - 1));
-
- Map<Integer, Long> positions =
- lastSeen
- .keySet()
- .stream()
- .collect(Collectors.toMap(
- partition -> partition,
- partition -> repository.storedPosition(partition)));
-
- while (
- positions
- .entrySet()
- .stream()
- .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
- .reduce(false, (a, b) -> a || b))
+ final long seen;
+
+
+ RestoreRecordHandler(Long endOffset)
{
- try
- {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
- if (records.count() == 0)
- continue;
+ this.seen = endOffset - 1;
+ }
- log.debug("polled {} records", records.count());
- records.forEach(record ->
- {
- handleRecord(record, restoreUseCases);
- positions.put(record.partition(), record.offset());
- });
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ if (seen < record.offset())
+ {
+ int partition = record.partition();
+ log.info(
+ "--> restore of partition {} completed: needed={}, seen={}!",
+ partition,
+ seen,
+ record.offset());
+ recordHandlers[partition] = productionRecordHandler;
+ productionRecordHandler.accept(record);
}
- catch(WakeupException e)
+ else
{
- log.info("--> cleanly interrupted while restoring");
+ handleRecord(record, restoreUseCases);
+ if (seen == record.offset())
+ {
+ int partition = record.partition();
+ log.info( "--> restore of partition {} completed!", partition);
+ recordHandlers[partition] = productionRecordHandler;
+ }
}
}
-
- log.info("--> restore completed!");
}
+
@PostMapping("start")
public synchronized String start()
{