X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=09614b8e23702f729214dd33bc7820ad3c83208d;hb=c808810e9e33afe33b29f7fd3921023ecd15483d;hp=f4c21041bf7c8febdb1b14abc77efc96c1be139b;hpb=818c1eb862247e25abf9f7d91d5a73e3e3789a39;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index f4c2104..09614b8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -20,13 +20,11 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -63,6 +61,8 @@ class ApplicationTests @Autowired KafkaConsumer kafkaConsumer; @Autowired + KafkaConsumer offsetConsumer; + @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired ApplicationProperties properties; @@ -71,9 +71,9 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - WordcountRebalanceListener wordcountRebalanceListener; + SumRebalanceListener sumRebalanceListener; @Autowired - WordcountRecordHandler wordcountRecordHandler; + SumRecordHandler sumRecordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -86,14 +86,20 @@ class ApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((partition, key, counter) -> + { + Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -126,8 +132,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -142,6 +148,24 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> + { + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StatisticsDocument document = + partitionStatisticsRepository + .findById(partition.toString()) + .orElse(new StatisticsDocument(partition)); + document.offset = offset; + partitionStatisticsRepository.save(document); + }); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { partitions().forEach(tp -> @@ -162,7 +186,12 @@ class ApplicationTests } - void send100Messages(Function messageGenerator) + public interface RecordGenerator + { + public ProducerRecord generate(int partition, String key, long counter); + } + + void send100Messages(RecordGenerator recordGenerator) { long i = 0; @@ -170,14 +199,8 @@ class ApplicationTests { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - Integer.toString(key%2), - value); + recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); kafkaProducer.send(record, (metadata, e) -> { @@ -207,6 +230,8 @@ class ApplicationTests @BeforeEach public void init() { + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); @@ -218,7 +243,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { + new TestRecordHandler(sumRecordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -235,7 +260,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, + sumRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); @@ -276,5 +301,19 @@ class ApplicationTests return new KafkaProducer<>(props); } + + @Bean + KafkaConsumer offsetConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); + props.put("enable.auto.commit", false); + props.put("auto.offset.reset", "latest"); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } } }