From: Kai Moritz Date: Sun, 11 Sep 2022 11:42:57 +0000 (+0200) Subject: Anzahl der Fehler für die Test-Logik verfügbar gemacht X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fdeserialization;p=demos%2Fkafka%2Ftraining Anzahl der Fehler für die Test-Logik verfügbar gemacht * Conflicts: ** src/test/java/de/juplo/kafka/ApplicationTests.java --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index b7f8308..e5e0982 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -22,46 +22,66 @@ public class ApplicationTests extends GenericApplicationTests final StringSerializer stringSerializer = new StringSerializer(); final LongSerializer longSerializer = new LongSerializer(); + int numberOfMessages; + int numberOfLogicErrors; + int numberOfPoisonPills; + @Override - public int generate( + public void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender) { - int i = 0; - for (int partition = 0; partition < 10; partition++) { for (int key = 0; key < 10000; key++) { - i++; + numberOfMessages++; - Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); - if (i == 99977) + Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long) numberOfMessages)); + if (numberOfMessages == 99977) { if (logicErrors) { value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + numberOfLogicErrors++; } if (poisonPills) { value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + numberOfPoisonPills++; } } ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))), - value); + new ProducerRecord<>( + TOPIC, + partition, + new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(partition * 10 + key % 2))), + value); messageSender.accept(record); } } + } + + @Override + public int getNumberOfMessages() + { + return numberOfMessages; + } - return i; + @Override + public int getNumberOfPoisonPills() + { + return numberOfPoisonPills; + } + + @Override + public int getNumberOfLogicErrors() + { + return numberOfLogicErrors; } }); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4883f75..5019466 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -78,8 +78,9 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws Exception { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); + recordGenerator.generate(false, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) @@ -107,8 +108,9 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); + recordGenerator.generate(true, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -144,8 +146,9 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); + recordGenerator.generate(false, true, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -263,11 +266,15 @@ abstract class GenericApplicationTests public interface RecordGenerator { - int generate( + void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender); + int getNumberOfMessages(); + int getNumberOfPoisonPills(); + int getNumberOfLogicErrors(); + default boolean canGeneratePoisonPill() { return true;