From: Kai Moritz Date: Sun, 10 Apr 2022 13:25:58 +0000 (+0200) Subject: Tests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt X-Git-Tag: deserialization-synchroner-test~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f6212abfac1d872979d2a27f5a6bf4708b643db6;p=demos%2Fkafka%2Ftraining Tests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 8461824..bf1cdb8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -70,15 +70,30 @@ class ApplicationTests { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); + Map oldOffsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Set> received = new HashSet<>(); - Map offsets = runEndlessConsumer(record -> + Map newOffsets = runEndlessConsumer(record -> { received.add(record); if (received.size() == 100) throw new WakeupException(); }); - check(offsets); + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + + check(newOffsets); } @Test @@ -94,6 +109,19 @@ class ApplicationTests doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Map newOffsets = runEndlessConsumer((record) -> {}); + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + check(oldOffsets); }