send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- Map<Integer, Long> offsets = runEndlessConsumer(record ->
+ Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
{
received.add(record);
if (received.size() == 100)
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, counter)));
- Map<Integer, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
- Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+ Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+ Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
check(oldOffsets);
}
}
}
- Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+ Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
{
- Map<Integer, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+ Consumer<ConsumerRecord<String, Long>> captureOffset =
+ record ->
+ offsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
EndlessConsumer<String, Long> endlessConsumer =
new EndlessConsumer<>(
executor,
kafkaConsumer.unsubscribe();
}
- void check(Map<Integer, Long> offsets)
+ void check(Map<TopicPartition, Long> offsets)
{
doForCurrentOffsets((tp, offset) ->
{
- Long expected = offsets.get(tp.partition()) + 1;
+ Long expected = offsets.get(tp) + 1;
log.debug("Checking, if the offset for {} is {}", tp, expected);
assertThat(offset).isEqualTo(expected);
});