doForCurrentOffsets((tp, offset) ->
{
oldOffsets.put(tp, offset - 1);
- newOffsets.put(tp, offset - 1);
+ seenOffsets.put(tp, offset - 1);
});
- Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
- record ->
+ TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<K, V>(recordHandler)
{
- seenOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- consumer.accept(record);
+ @Override
+ public void onNewRecord(ConsumerRecord<K, V> record)
+ {
- newOffsets.put(
++ seenOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ }
};
endlessConsumer =