Tests: Überprüft, ob überhaupt ein Offset-Fortschritt vorliegt
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 13:25:58 +0000 (15:25 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Apr 2022 13:23:50 +0000 (15:23 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 8461824..bf1cdb8 100644 (file)
@@ -70,15 +70,30 @@ class ApplicationTests
        {
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
+               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
+               Map<TopicPartition, Long> newOffsets = runEndlessConsumer(record ->
                {
                        received.add(record);
                        if (received.size() == 100)
                                throw new WakeupException();
                });
 
-               check(offsets);
+               Set<TopicPartition> withProgress = new HashSet<>();
+               partitions().forEach(tp ->
+               {
+                       Long oldOffset = oldOffsets.get(tp);
+                       Long newOffset = newOffsets.get(tp);
+                       if (!oldOffset.equals(newOffset))
+                       {
+                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+                               withProgress.add(tp);
+                       }
+               });
+               assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+
+               check(newOffsets);
        }
 
        @Test
@@ -94,6 +109,19 @@ class ApplicationTests
                doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
                Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
 
+               Set<TopicPartition> withProgress = new HashSet<>();
+               partitions().forEach(tp ->
+               {
+                       Long oldOffset = oldOffsets.get(tp);
+                       Long newOffset = newOffsets.get(tp);
+                       if (!oldOffset.equals(newOffset))
+                       {
+                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+                               withProgress.add(tp);
+                       }
+               });
+               assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+
                check(oldOffsets);
        }