@Autowired
ExecutorService executor;
+ Map<TopicPartition, Long> oldOffsets;
+ Map<TopicPartition, Long> newOffsets;
+
@Test
@Order(1) // << The poistion pill is not skipped. Hence, this test must run first
{
send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
- Map<TopicPartition, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- Map<TopicPartition, Long> newOffsets = runEndlessConsumer(record ->
+ runEndlessConsumer(record ->
{
received.add(record);
if (received.size() == 100)
throw new WakeupException();
});
- Set<TopicPartition> withProgress = new HashSet<>();
- partitions().forEach(tp ->
- {
- Long oldOffset = oldOffsets.get(tp);
- Long newOffset = newOffsets.get(tp);
- if (!oldOffset.equals(newOffset))
- {
- log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
- withProgress.add(tp);
- }
- });
- assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
-
- check(newOffsets);
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
}
@Test
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, counter)));
- Map<TopicPartition, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
- Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
-
- Set<TopicPartition> withProgress = new HashSet<>();
- partitions().forEach(tp ->
- {
- Long oldOffset = oldOffsets.get(tp);
- Long newOffset = newOffsets.get(tp);
- if (!oldOffset.equals(newOffset))
- {
- log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
- withProgress.add(tp);
- }
- });
- assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+ runEndlessConsumer((record) -> {});
- check(oldOffsets);
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(oldOffsets);
}
}
}
- Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+ EndlessConsumer<String, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
{
- Map<TopicPartition, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
Consumer<ConsumerRecord<String, Long>> captureOffset =
record ->
- offsets.put(
+ newOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset());
+
EndlessConsumer<String, Long> endlessConsumer =
new EndlessConsumer<>(
executor,
endlessConsumer.run();
- return offsets;
+ return endlessConsumer;
}
List<TopicPartition> partitions()
kafkaConsumer.unsubscribe();
}
- void check(Map<TopicPartition, Long> offsets)
+ void checkSeenOffsetsForProgress()
+ {
+ // Be sure, that some messages were consumed...!
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp);
+ Long newOffset = newOffsets.get(tp);
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+ }
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
{
doForCurrentOffsets((tp, offset) ->
{
- Long expected = offsets.get(tp) + 1;
+ Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset for {} is {}", tp, expected);
assertThat(offset).isEqualTo(expected);
});