X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=3ded0d267c1e61968c17df21bd01e57706d27013;hb=b8bdc40c0441591c1dc5ab5a3922db1c1dca2ed9;hp=cbf215ef4f47b97f89b46abee37aa58b87195c66;hpb=a9200a876060edc8683dfd6d0d16c23407c189ad;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index cbf215e..3ded0d2 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -67,10 +67,13 @@ class ApplicationTests @Autowired ApplicationProperties properties; @Autowired + EndlessConsumer endlessConsumer; + @Autowired RecordHandler recordHandler; Map oldOffsets; Map newOffsets; + Set> receivedRecords; /** Tests methods */ @@ -81,12 +84,9 @@ class ApplicationTests { send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); - Set> received = new HashSet<>(); - recordHandler.testHandler = record -> received.add(record); - await("100 records received") .atMost(Duration.ofSeconds(30)) - .until(() -> received.size() >= 100); + .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -111,6 +111,9 @@ class ApplicationTests .untilAsserted(() -> checkSeenOffsetsForProgress()); compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); } @@ -122,7 +125,9 @@ class ApplicationTests { Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset for {} is {}", tp, expected); - assertThat(offset).isEqualTo(expected); + assertThat(offset) + .describedAs("Committed offset corresponds to the offset of the consumer") + .isEqualTo(expected); }); } @@ -140,7 +145,9 @@ class ApplicationTests withProgress.add(tp); } }); - assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress"); + assertThat(withProgress) + .describedAs("Some offsets must have changed, compared to the old offset-positions") + .isNotEmpty(); } @@ -212,6 +219,7 @@ class ApplicationTests oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); + receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { @@ -221,11 +229,28 @@ class ApplicationTests recordHandler.captureOffsets = record -> + { + receivedRecords.add(record); newOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); + }; + + endlessConsumer.start(); } + @AfterEach + public void deinit() + { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.info("Exception while stopping the consumer: {}", e.toString()); + } + } public static class RecordHandler implements Consumer> {