From: Kai Moritz Date: Fri, 15 Apr 2022 12:01:07 +0000 (+0200) Subject: Springify: ROT - Merge des verschärften Tests aus der Vanilla-Version X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b8bdc40c0441591c1dc5ab5a3922db1c1dca2ed9;p=demos%2Fkafka%2Ftraining Springify: ROT - Merge des verschärften Tests aus der Vanilla-Version * Ohne die Verschärfung des Tests war der Test grün, obwohl die Anwendung entgegen den Erwartungen trotz der eingetreuten Poison-Pill _alle_ Nachrichten gelesen hat. * Jetzt schlägt der Test wie erwartet fehl. --- b8bdc40c0441591c1dc5ab5a3922db1c1dca2ed9 diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 35d2b2e,4cc4f91..3ded0d2 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -67,12 -63,13 +67,13 @@@ class ApplicationTest @Autowired ApplicationProperties properties; @Autowired - ExecutorService executor; + EndlessConsumer endlessConsumer; + @Autowired + RecordHandler recordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; + Set> receivedRecords; /** Tests methods */ @@@ -110,9 -104,21 +108,12 @@@ await("Consumer failed") .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); + .untilAsserted(() -> checkSeenOffsetsForProgress()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); } @@@ -221,12 -232,24 +227,15 @@@ newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + recordHandler.captureOffsets = record -> + { ++ receivedRecords.add(record); newOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + }; - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); }