+ /** Helper methods for setting up and running the tests */
+
+ void seekToEnd()
+ {
+ offsetConsumer.assign(partitions());
+ offsetConsumer.seekToEnd(partitions());
+ partitions().forEach(tp ->
+ {
+ // seekToEnd() works lazily: it only takes effect on poll()/position()
+ Long offset = offsetConsumer.position(tp);
+ log.info("New position for {}: {}", tp, offset);
+ });
+ // The new positions must be commited!
+ offsetConsumer.commitSync();
+ offsetConsumer.unsubscribe();
+ }
+
+ void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+ {
+ offsetConsumer.assign(partitions());
+ partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
+ offsetConsumer.unsubscribe();
+ }
+
+ List<TopicPartition> partitions()
+ {
+ return
+ IntStream
+ .range(0, PARTITIONS)
+ .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+ .collect(Collectors.toList());