- Map<Integer, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
- EndlessConsumer<String, Long> endlessConsumer =
+ testHandler = record -> {} ;
+
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+ receivedRecords = new HashSet<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ Consumer<ConsumerRecord<String, String>> captureOffsetAndExecuteTestHandler =
+ record ->
+ {
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ testHandler.accept(record);
+ };
+
+ endlessConsumer =