X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=6aaff523c7071232bd2264fb0f6bca1b1c44cd0f;hb=475d3b4149a49568680ee1f6d6f4a1b7a45845df;hp=bf1cdb8515a30053b64bfec64662809c8fa69d43;hpb=f6212abfac1d872979d2a27f5a6bf4708b643db6;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bf1cdb8..6aaff52 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -63,6 +63,9 @@ class ApplicationTests @Autowired ExecutorService executor; + Map oldOffsets; + Map newOffsets; + @Test @Order(1) // << The poistion pill is not skipped. Hence, this test must run first @@ -70,30 +73,16 @@ class ApplicationTests { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); - Map oldOffsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Set> received = new HashSet<>(); - Map newOffsets = runEndlessConsumer(record -> + runEndlessConsumer(record -> { received.add(record); if (received.size() == 100) throw new WakeupException(); }); - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); - - check(newOffsets); + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); } @Test @@ -105,24 +94,10 @@ class ApplicationTests ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) : new Bytes(longSerializer.serialize(TOPIC, counter))); - Map oldOffsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); - Map newOffsets = runEndlessConsumer((record) -> {}); - - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + runEndlessConsumer((record) -> {}); - check(oldOffsets); + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(oldOffsets); } @@ -167,15 +142,23 @@ class ApplicationTests } } - Map runEndlessConsumer(Consumer> consumer) + EndlessConsumer runEndlessConsumer(Consumer> consumer) { - Map offsets = new HashMap<>(); - doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1)); + oldOffsets = new HashMap<>(); + newOffsets = new HashMap<>(); + + doForCurrentOffsets((tp, offset) -> + { + oldOffsets.put(tp, offset - 1); + newOffsets.put(tp, offset - 1); + }); + Consumer> captureOffset = record -> - offsets.put( + newOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); + EndlessConsumer endlessConsumer = new EndlessConsumer<>( executor, @@ -186,7 +169,7 @@ class ApplicationTests endlessConsumer.run(); - return offsets; + return endlessConsumer; } List partitions() @@ -205,11 +188,28 @@ class ApplicationTests kafkaConsumer.unsubscribe(); } - void check(Map offsets) + void checkSeenOffsetsForProgress() + { + // Be sure, that some messages were consumed...! + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + } + + void compareToCommitedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { - Long expected = offsets.get(tp) + 1; + Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset for {} is {}", tp, expected); assertThat(offset).isEqualTo(expected); });