- Map<TopicPartition, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset =
- record ->
- offsets.put(
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+ receivedRecords = new HashSet<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<String, String>(wordcountRecordHandler) {
+ @Override
+ public void onNewRecord(ConsumerRecord<String, String> record)
+ {
+ newOffsets.put(