From: Kai Moritz Date: Sun, 14 Aug 2022 13:35:28 +0000 (+0200) Subject: Signatur und Handling des `RecordGenerator` vereinfacht/überarbeitet X-Git-Tag: sumup-adder---lvm-2-tage~9^2~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1288af99aeb350661f8b0a60762cba8e1b0f6a24;p=demos%2Fkafka%2Ftraining Signatur und Handling des `RecordGenerator` vereinfacht/überarbeitet * Der `RecordGenerator` darf jetzt selbst bestimmen, wie viele Nachrichten er erzeugt und wo wieviele Poison-Pills oder Logik-Fehler erzeugt werden, wenn der Test dies anfordert. * Dafür git der `RecordGenerator` jetzt die Anzahl der tatsächlich erzeugten Nachrichten zurück, damit die Tests richtig reagieren können. --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 1272124..8369a7b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,9 +27,9 @@ public class ApplicationTests extends GenericApplicationTests @Override - public void generate( - Set poisonPills, - Set logicErrors, + public int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender) { int i = 0; @@ -38,17 +38,19 @@ public class ApplicationTests extends GenericApplicationTests { for (int key = 0; key < 10; key++) { - if (++i > 100) - return; + i++; Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); - if (logicErrors.contains(i)) + if (i == 77) { - value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); - } - if (poisonPills.contains(i)) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + if (logicErrors) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } } ProducerRecord record = @@ -61,6 +63,8 @@ public class ApplicationTests extends GenericApplicationTests messageSender.accept(record); } } + + return i; } }); } diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java index 99af3b2..606218f 100644 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java @@ -30,7 +30,7 @@ public class ErrorCannotBeGeneratedCondition implements ExecutionCondition GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); List missingRequiredErrors = new LinkedList<>(); - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisionPill()) + if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) missingRequiredErrors.add("Poison-Pill"); if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 1aacb94..9175e52 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -78,12 +78,13 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, false, messageSender); - await("100 records received") + await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= 100); + .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -103,7 +104,8 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(true, false, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -123,7 +125,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(newOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -137,7 +139,8 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { - recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, true, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -157,7 +160,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(oldOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -238,12 +241,12 @@ abstract class GenericApplicationTests public interface RecordGenerator { - void generate( - Set poisonPills, - Set logicErrors, + int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender); - default boolean canGeneratePoisionPill() + default boolean canGeneratePoisonPill() { return true; }