Map<TopicPartition, Long> newOffsets;
+ /** Tests methods */
+
@Test
@Order(1) // << The poistion pill is not skipped. Hence, this test must run first
void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
}
+ /** Helper methods for the verification of expectations */
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+ {
+ doForCurrentOffsets((tp, offset) ->
+ {
+ Long expected = offsetsToCheck.get(tp) + 1;
+ log.debug("Checking, if the offset for {} is {}", tp, expected);
+ assertThat(offset).isEqualTo(expected);
+ });
+ }
+
+ void checkSeenOffsetsForProgress()
+ {
+ // Be sure, that some messages were consumed...!
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp);
+ Long newOffset = newOffsets.get(tp);
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+ }
+
+
+ /** Helper methods for setting up and running the tests */
+
+ void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+ {
+ offsetConsumer.assign(partitions());
+ partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
+ offsetConsumer.unsubscribe();
+ }
+
+ List<TopicPartition> partitions()
+ {
+ return
+ IntStream
+ .range(0, PARTITIONS)
+ .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+ .collect(Collectors.toList());
+ }
+
+
void send100Messages(Function<Long, Bytes> messageGenerator)
{
long i = 0;
}
}
+
@BeforeEach
public void init()
{
endlessConsumer.start();
}
- List<TopicPartition> partitions()
- {
- return
- IntStream
- .range(0, PARTITIONS)
- .mapToObj(partition -> new TopicPartition(TOPIC, partition))
- .collect(Collectors.toList());
- }
-
- void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
- {
- offsetConsumer.assign(partitions());
- partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
- offsetConsumer.unsubscribe();
- }
-
- void checkSeenOffsetsForProgress()
- {
- // Be sure, that some messages were consumed...!
- Set<TopicPartition> withProgress = new HashSet<>();
- partitions().forEach(tp ->
- {
- Long oldOffset = oldOffsets.get(tp);
- Long newOffset = newOffsets.get(tp);
- if (!oldOffset.equals(newOffset))
- {
- log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
- withProgress.add(tp);
- }
- });
- assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
- }
-
- void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
- {
- doForCurrentOffsets((tp, offset) ->
- {
- Long expected = offsetsToCheck.get(tp) + 1;
- log.debug("Checking, if the offset for {} is {}", tp, expected);
- assertThat(offset).isEqualTo(expected);
- });
- }
-
-
@AfterEach
public void deinit()
{
}
}
+
@TestConfiguration
@Import(ApplicationConfiguration.class)
public static class Configuration