X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=4b7ef36f502fdc1946be4c028543b057329adc03;hb=e53004f3133b737699e995a3b18fff28203a0e8c;hp=caa25c54313ae2fd4b01e4820bcf135c62d57c5d;hpb=83a4bf324f5a7ec6010a7921118ec7d6e8f997cf;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index caa25c5..4b7ef36 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -63,7 +63,7 @@ class ApplicationTests @Autowired KafkaConsumer kafkaConsumer; @Autowired - KafkaConsumer offsetConsumer; + PartitionStatisticsRepository partitionStatisticsRepository; @Autowired ApplicationProperties properties; @Autowired @@ -177,9 +177,12 @@ class ApplicationTests void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions() @@ -306,18 +309,5 @@ class ApplicationTests return new KafkaProducer<>(props); } - - @Bean - KafkaConsumer offsetConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } } }