X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=6f58180f03c4a6a0a4741af5e43ba2c5e6155ca5;hb=4a6907ee4675c3d9bc7fd4ba31a7048527f53a50;hp=e35b223666e6b1519292f513f5cd9280b1f5278c;hpb=2939f4b7bae23df34968ce3d87be1b83cf0fba90;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index e35b223..6f58180 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -97,7 +97,7 @@ class ApplicationTests @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 @@ -109,7 +109,15 @@ class ApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); }