X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=0f40058b2e8ed93582e0456e9aa1be9c5b3f5d17;hb=a89b6d6a026cc32da8186d52c48b25ac1e6249c2;hp=93daf6ba490414763fbc76f460130272096d102d;hpb=600b0b10f8e98bebef80f75e391a78c459ffb45c;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 93daf6b..0f40058 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -62,6 +62,8 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired + StateRepository stateRepository; + @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @@ -74,7 +76,7 @@ abstract class GenericApplicationTests KafkaConsumer offsetConsumer; EndlessConsumer endlessConsumer; Map oldOffsets; - Map newOffsets; + Map seenOffsets; Set> receivedRecords; @@ -107,7 +109,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -130,7 +132,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -139,7 +141,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -167,7 +169,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -175,8 +177,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -191,18 +192,37 @@ abstract class GenericApplicationTests /** Helper methods for the verification of expectations */ - void compareToCommitedOffsets(Map offsetsToCheck) + void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); + log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); assertThat(offset) .describedAs("Committed offset corresponds to the offset of the consumer") .isEqualTo(expected); }); } + void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) + { + List isOffsetBehindSeen = new LinkedList<>(); + + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); + assertThat(offset) + .describedAs("Committed offset corresponds to the offset of the consumer") + .isLessThanOrEqualTo(expected); + isOffsetBehindSeen.add(offset < expected); + }); + + assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) + .describedAs("Committed offsets are behind seen offsets") + .isTrue(); + } + void checkSeenOffsetsForProgress() { // Be sure, that some messages were consumed...! @@ -210,7 +230,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = newOffsets.get(tp) + 1; + Long newOffset = seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -228,23 +248,29 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { - // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(partition.toString()) + .orElse(new StateDocument(partition)); + document.offset = offset; + stateRepository.save(document); }); - // The new positions must be commited! - offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions() @@ -324,16 +350,17 @@ abstract class GenericApplicationTests props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>(); - newOffsets = new HashMap<>(); + seenOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - newOffsets.put(tp, offset - 1); + seenOffsets.put(tp, offset - 1); }); TestRecordHandler captureOffsetAndExecuteTestHandler = @@ -342,15 +369,13 @@ abstract class GenericApplicationTests @Override public void onNewRecord(ConsumerRecord record) { - newOffsets.put( + seenOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record); } }; - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - endlessConsumer = new EndlessConsumer<>( executor,