- Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
- {
- Map<TopicPartition, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset =
- record ->
- offsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- EndlessConsumer<String, Long> endlessConsumer =
- new EndlessConsumer<>(
- executor,
- properties.getClientId(),
- properties.getTopic(),
- kafkaConsumer,
- captureOffset.andThen(consumer));
-
- endlessConsumer.run();
-
- return offsets;
- }