X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=b019373250d2c5c15062d43c4b262d5d7486f5e8;hb=d2eb370acf1a2195c36421ffc471f67cb4a8e86e;hp=93daf6ba490414763fbc76f460130272096d102d;hpb=600b0b10f8e98bebef80f75e391a78c459ffb45c;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 93daf6b..b019373 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -62,6 +62,8 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired + StateRepository stateRepository; + @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @@ -228,23 +230,29 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { - // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(partition.toString()) + .orElse(new StateDocument(partition)); + document.offset = offset; + stateRepository.save(document); }); - // The new positions must be commited! - offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions() @@ -324,6 +332,7 @@ abstract class GenericApplicationTests props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>(); @@ -349,8 +358,6 @@ abstract class GenericApplicationTests } }; - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - endlessConsumer = new EndlessConsumer<>( executor,