- Map<Integer, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
- EndlessConsumer<String, Long> endlessConsumer =
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+ receivedRecords = new HashSet<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<String, String>(wordcountRecordHandler) {
+ @Override
+ public void onNewRecord(ConsumerRecord<String, String> record)
+ {
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ }
+ };
+
+ endlessConsumer =