- return
- IntStream
- .range(0, PARTITIONS)
- .mapToObj(partition -> new TopicPartition(TOPIC, partition))
- .collect(Collectors.toList());
+ recordHandler.testHandler = (record) -> {};
+
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+ receivedRecords = new HashSet<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ recordHandler.captureOffsets =
+ record ->
+ {
+ receivedRecords.add(record);
+ log.debug("TEST: Processing record #{}: {}", receivedRecords.size(), record.value());
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ };
+
+ endlessConsumer.start();