Tests: Refaktorisiert - DRY für Test auf Offset-Fortschritt
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 13:54:26 +0000 (15:54 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Apr 2022 13:23:50 +0000 (15:23 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index bf1cdb8..6aaff52 100644 (file)
@@ -63,6 +63,9 @@ class ApplicationTests
        @Autowired
        ExecutorService executor;
 
+       Map<TopicPartition, Long> oldOffsets;
+       Map<TopicPartition, Long> newOffsets;
+
 
        @Test
        @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
@@ -70,30 +73,16 @@ class ApplicationTests
        {
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
-               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               Map<TopicPartition, Long> newOffsets = runEndlessConsumer(record ->
+               runEndlessConsumer(record ->
                {
                        received.add(record);
                        if (received.size() == 100)
                                throw new WakeupException();
                });
 
-               Set<TopicPartition> withProgress = new HashSet<>();
-               partitions().forEach(tp ->
-               {
-                       Long oldOffset = oldOffsets.get(tp);
-                       Long newOffset = newOffsets.get(tp);
-                       if (!oldOffset.equals(newOffset))
-                       {
-                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-                               withProgress.add(tp);
-                       }
-               });
-               assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
-
-               check(newOffsets);
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(newOffsets);
        }
 
        @Test
@@ -105,24 +94,10 @@ class ApplicationTests
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : new Bytes(longSerializer.serialize(TOPIC, counter)));
 
-               Map<TopicPartition, Long> oldOffsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
-               Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
-
-               Set<TopicPartition> withProgress = new HashSet<>();
-               partitions().forEach(tp ->
-               {
-                       Long oldOffset = oldOffsets.get(tp);
-                       Long newOffset = newOffsets.get(tp);
-                       if (!oldOffset.equals(newOffset))
-                       {
-                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-                               withProgress.add(tp);
-                       }
-               });
-               assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+               runEndlessConsumer((record) -> {});
 
-               check(oldOffsets);
+               checkSeenOffsetsForProgress();
+               compareToCommitedOffsets(oldOffsets);
        }
 
 
@@ -167,15 +142,23 @@ class ApplicationTests
                }
        }
 
-       Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+       EndlessConsumer<String, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
        {
-               Map<TopicPartition, Long> offsets = new HashMap<>();
-               doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+               oldOffsets = new HashMap<>();
+               newOffsets = new HashMap<>();
+
+               doForCurrentOffsets((tp, offset) ->
+               {
+                       oldOffsets.put(tp, offset - 1);
+                       newOffsets.put(tp, offset - 1);
+               });
+
                Consumer<ConsumerRecord<String, Long>> captureOffset =
                                record ->
-                                               offsets.put(
+                                               newOffsets.put(
                                                                new TopicPartition(record.topic(), record.partition()),
                                                                record.offset());
+
                EndlessConsumer<String, Long> endlessConsumer =
                                new EndlessConsumer<>(
                                                executor,
@@ -186,7 +169,7 @@ class ApplicationTests
 
                endlessConsumer.run();
 
-               return offsets;
+               return endlessConsumer;
        }
 
        List<TopicPartition> partitions()
@@ -205,11 +188,28 @@ class ApplicationTests
                kafkaConsumer.unsubscribe();
        }
 
-       void check(Map<TopicPartition, Long> offsets)
+       void checkSeenOffsetsForProgress()
+       {
+               // Be sure, that some messages were consumed...!
+               Set<TopicPartition> withProgress = new HashSet<>();
+               partitions().forEach(tp ->
+               {
+                       Long oldOffset = oldOffsets.get(tp);
+                       Long newOffset = newOffsets.get(tp);
+                       if (!oldOffset.equals(newOffset))
+                       {
+                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+                               withProgress.add(tp);
+                       }
+               });
+               assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+       }
+
+       void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
        {
                doForCurrentOffsets((tp, offset) ->
                {
-                       Long expected = offsets.get(tp) + 1;
+                       Long expected = offsetsToCheck.get(tp) + 1;
                        log.debug("Checking, if the offset for {} is {}", tp, expected);
                        assertThat(offset).isEqualTo(expected);
                });