- Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
- record ->
- {
- seenOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- consumer.accept(record);
- };
-
- endlessConsumer =
- new EndlessConsumer<>(
- executor,
- properties.getClientId(),
- properties.getTopic(),
- kafkaConsumer,
- captureOffsetAndExecuteTestHandler);
-