X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=09614b8e23702f729214dd33bc7820ad3c83208d;hb=c808810e9e33afe33b29f7fd3921023ecd15483d;hp=fc5d4c9e51d21348a9271fad7877bbd07b9ed0fe;hpb=2da45caa1f9d32e3a5506d71cce7f06fa2e36523;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..09614b8 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -60,7 +59,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -72,14 +71,14 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + SumRebalanceListener sumRebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + SumRecordHandler sumRecordHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -112,45 +111,6 @@ class ApplicationTests .describedAs("Consumer should still be running"); } - @Test - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - send100Messages((partition, key, counter) -> - { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); - }); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - } - /** Helper methods for the verification of expectations */ @@ -214,7 +174,7 @@ class ApplicationTests Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); - } + } List partitions() { @@ -282,10 +242,10 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(sumRecordHandler) { @Override - public void onNewRecord(ConsumerRecord record) + public void onNewRecord(ConsumerRecord record) { newOffsets.put( new TopicPartition(record.topic(), record.partition()), @@ -300,7 +260,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + sumRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();