From f6212abfac1d872979d2a27f5a6bf4708b643db6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Apr 2022 15:25:58 +0200 Subject: [PATCH] =?utf8?q?Tests:=20=C3=9Cberpr=C3=BCft,=20ob=20=C3=BCberha?= =?utf8?q?upt=20ein=20Offset-Fortschritt=20vorliegt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 8461824..bf1cdb8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -70,15 +70,30 @@ class ApplicationTests { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); + Map oldOffsets = new HashMap<>(); + doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Set> received = new HashSet<>(); - Map offsets = runEndlessConsumer(record -> + Map newOffsets = runEndlessConsumer(record -> { received.add(record); if (received.size() == 100) throw new WakeupException(); }); - check(offsets); + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + + check(newOffsets); } @Test @@ -94,6 +109,19 @@ class ApplicationTests doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1)); Map newOffsets = runEndlessConsumer((record) -> {}); + Set withProgress = new HashSet<>(); + partitions().forEach(tp -> + { + Long oldOffset = oldOffsets.get(tp); + Long newOffset = newOffsets.get(tp); + if (!oldOffset.equals(newOffset)) + { + log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); + withProgress.add(tp); + } + }); + assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + check(oldOffsets); } -- 2.20.1