From: Kai Moritz <kai@juplo.de> Date: Sun, 24 Jul 2022 15:18:33 +0000 (+0200) Subject: Umstellung des Nachrichten-Datentyps auf Long zurückgenommen X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~4^2~5 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=60bc4a251dc9bab71d5ab5f12870147fec253ac9;p=demos%2Fkafka%2Ftraining Umstellung des Nachrichten-Datentyps auf Long zurückgenommen * Im Branch 'deserialization' wurde der Datentyp der Nachricht von `String` auf `Long` umgestellt, um eine `DeserializationException` vorzuführen, die innerhalb des Kafka-Codes geworfen wird. * Diese Änderung wurde schon dort nicht in dem `README.sh`-Skript reflektiert. * Hier stört sie jetzt die Experimente mit dem `EndlessProducer`, der Nachrichten vom Typ `String` erzeugt, so dass der Consumer kein einzige Nachricht annehmen kann. * Daher wird der Nachrichten-Datentyp hier wieder auf `String` zurück umgestellt. * Dafür musste auch der Testfall angepasst und der Test entfernt werden, der die Exception kontrolliert. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9b06b093..08c39554 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -20,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer<ConsumerRecord<String, Long>> consumer() + public Consumer<ConsumerRecord<String, String>> consumer() { return (record) -> { @@ -29,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer<String, Long> endlessConsumer( - KafkaConsumer<String, Long> kafkaConsumer, + public EndlessConsumer<String, String> endlessConsumer( + KafkaConsumer<String, String> kafkaConsumer, ExecutorService executor, - Consumer<ConsumerRecord<String, Long>> handler, + Consumer<ConsumerRecord<String, String>> handler, PartitionStatisticsRepository repository, ApplicationProperties properties) { @@ -55,7 +54,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -67,7 +66,7 @@ public class ApplicationConfiguration props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26ec..df4e653c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer<String, Long> consumer; + private final EndlessConsumer<String, String> consumer; @Override diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index ce5dd723..f9a96292 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -236,7 +236,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl } } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() throws InterruptedException { lock.lock(); try diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 431431bb..ca72e3c5 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -63,7 +62,7 @@ class ApplicationTests @Autowired KafkaProducer<String, Bytes> kafkaProducer; @Autowired - KafkaConsumer<String, Long> kafkaConsumer; + KafkaConsumer<String, String> kafkaConsumer; @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired @@ -73,17 +72,16 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; - Consumer<ConsumerRecord<String, Long>> testHandler; - EndlessConsumer<String, Long> endlessConsumer; + Consumer<ConsumerRecord<String, String>> testHandler; + EndlessConsumer<String, String> endlessConsumer; Map<TopicPartition, Long> oldOffsets; Map<TopicPartition, Long> newOffsets; - Set<ConsumerRecord<String, Long>> receivedRecords; + Set<ConsumerRecord<String, String>> receivedRecords; /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); @@ -105,41 +103,6 @@ class ApplicationTests .describedAs("Consumer should still be running"); } - @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() - { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - } - /** Helper methods for the verification of expectations */ @@ -254,7 +217,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler = + Consumer<ConsumerRecord<String, String>> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put(