From c446512ec3bfa29e5e8482074cb6daf7e2ee1b2f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 26 Jul 2022 15:37:43 +0200 Subject: [PATCH] =?utf8?q?Testfall=20=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Abhängigkeit der Testergebnisse von Ausführreihenfolge beseitigt. * Die Abhängigkeit bestand, da die Offset-Positionen als Zustand die Testausführung überdauert haben. * Daher konnte kein weiterer Test mehr ausgeführt werden, nachdem einmal eine Poison-Pill in das Topic geschrieben wurde, über die der implementierte Consumer stolpert. * Um das zu umgehen, werden die Offset-Positionen jetzt nach jedem Test auf das Ende der Partitionen verschoben. D.h., wenn in dem Test eine Poision-Pill geschrieben wird, über die der implementierte Consumer nicht hinweglesen kann, werden die Offests vor der Ausführung des nächsten Tests über diese Poision-Pill hinweg gesetzt. * Dadurch ist wurde ein Fehler / eine Schwäche in der Testlogik aufgedeckt: In dem Test für das erfolgreiche Schreiben wurde nur deswegen ein Commit ausgeführt, weil zuvor noch kein Commit durchgeführt wurde, so dass der Default-Wert für das Commit-Interval immer überschritten war. * Um das zu umgehen, wurde eine Konfigurations-Option für das Setzen des Parameters `auto.commit.interval` eingeführt, so dass im Test sichergestellt werden kann, dass auf jeden Fall in dem beobachteten Zeitraum ein automatischer Commit ausgelöst wird. * Außerdem: Weniger verwirrende Ausgabe des Offset-Fortschritts. --- .../juplo/kafka/ApplicationConfiguration.java | 1 + .../de/juplo/kafka/ApplicationProperties.java | 3 ++ src/main/resources/application.yml | 1 + .../java/de/juplo/kafka/ApplicationTests.java | 29 ++++++++++++++----- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..766740b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -58,6 +58,7 @@ public class ApplicationConfiguration props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.time.Duration; @ConfigurationProperties(prefix = "consumer") @@ -30,4 +31,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String autoOffsetReset; + @NotNull + private Duration commitInterval; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..f8bfe7e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ consumer: client-id: DEV topic: test auto-offset-reset: earliest + commit-interval: 5s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 05eebd0..26a34e4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,7 +39,8 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "consumer.commit-interval=100ms" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -74,7 +74,6 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages((partition, key, counter) -> @@ -101,8 +100,7 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { send100Messages((partition, key, counter) -> { @@ -159,8 +157,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -175,6 +173,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -238,6 +251,8 @@ class ApplicationTests { testHandler = record -> {} ; + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); -- 2.20.1