X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=a632a89c41f936b88f1f23742e78183279138ef3;hb=b3777fba0ae679d9e2c9d36626fa208a952f83e8;hp=f4c21041bf7c8febdb1b14abc77efc96c1be139b;hpb=818c1eb862247e25abf9f7d91d5a73e3e3789a39;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index f4c2104..a632a89 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -20,7 +21,6 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @@ -61,7 +61,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired @@ -71,19 +71,20 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - WordcountRebalanceListener wordcountRebalanceListener; + KeyCountingRebalanceListener keyCountingRebalanceListener; @Autowired - WordcountRecordHandler wordcountRecordHandler; + KeyCountingRecordHandler keyCountingRecordHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @Test + @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); @@ -105,6 +106,41 @@ class ApplicationTests .describedAs("Consumer should still be running"); } + @Test + @Order(2) + void commitsOffsetOfErrorForReprocessingOnError() + { + send100Messages(counter -> + counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(TOPIC, counter))); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RecordDeserializationException.class); + } + /** Helper methods for the verification of expectations */ @@ -150,7 +186,7 @@ class ApplicationTests Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); - } + } List partitions() { @@ -217,10 +253,10 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(keyCountingRecordHandler) { @Override - public void onNewRecord(ConsumerRecord record) + public void onNewRecord(ConsumerRecord record) { newOffsets.put( new TopicPartition(record.topic(), record.partition()), @@ -235,7 +271,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, + keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();