From 60bc4a251dc9bab71d5ab5f12870147fec253ac9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 17:18:33 +0200 Subject: [PATCH] =?utf8?q?Umstellung=20des=20Nachrichten-Datentyps=20auf?= =?utf8?q?=20Long=20zur=C3=BCckgenommen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Im Branch 'deserialization' wurde der Datentyp der Nachricht von `String` auf `Long` umgestellt, um eine `DeserializationException` vorzuführen, die innerhalb des Kafka-Codes geworfen wird. * Diese Änderung wurde schon dort nicht in dem `README.sh`-Skript reflektiert. * Hier stört sie jetzt die Experimente mit dem `EndlessProducer`, der Nachrichten vom Typ `String` erzeugt, so dass der Consumer kein einzige Nachricht annehmen kann. * Daher wird der Nachrichten-Datentyp hier wieder auf `String` zurück umgestellt. * Dafür musste auch der Testfall angepasst und der Test entfernt werden, der die Exception kontrolliert. --- .../juplo/kafka/ApplicationConfiguration.java | 13 +++-- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/EndlessConsumer.java | 2 +- .../java/de/juplo/kafka/ApplicationTests.java | 47 ++----------------- 4 files changed, 13 insertions(+), 51 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9b06b09..08c3955 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -20,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -29,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, PartitionStatisticsRepository repository, ApplicationProperties properties) { @@ -55,7 +54,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -67,7 +66,7 @@ public class ApplicationConfiguration props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26e..df4e653 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index ce5dd72..f9a9629 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -236,7 +236,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() throws InterruptedException { lock.lock(); try diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 431431b..ca72e3c 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -63,7 +62,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired @@ -73,17 +72,16 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); @@ -105,41 +103,6 @@ class ApplicationTests .describedAs("Consumer should still be running"); } - @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() - { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - } - /** Helper methods for the verification of expectations */ @@ -254,7 +217,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( -- 2.20.1