From ac154bb18a6c575fe01e70cba6a86d10580dfb89 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 20:41:06 +0200 Subject: [PATCH] DLT auf Basis des `DeadLetterPublishingRecoverer` konfiguriert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `DeadLetterPublishingRecoverer` muss explizit instanziiert werden. * Um ihm den Spring-Kafka-Beans bekannt zu machen, muss die `DefaultErrorHandler`-Bean überschrieben werden. * Der Recoverer wird dem Handler zusammen mit einer BackOff-Strategie übergeben. * Damit der `DeadLetterPublishingRecoverer` die weiterzuleitenden Nachrichten senden kann, muss * Der Producer benötigt scheinbar einen separaten Eintrag für `bootstrap-servers` unter `spring.kafka.producer`. Der Eintrag `spring.kafa.bootstrap-servers` wird hier nicht übernommen! --- docker-compose.yml | 5 ++ .../juplo/kafka/ApplicationConfiguration.java | 49 ++++++++++++++ src/main/resources/application.yml | 9 ++- .../juplo/kafka/GenericApplicationTests.java | 64 +++++++++---------- 4 files changed, 93 insertions(+), 34 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 16fec5b..960bbc2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -85,10 +85,13 @@ services: bash -c " kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --describe --topic in kafka-topics --bootstrap-server kafka:9092 --describe --topic out + kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT " cli: @@ -130,6 +133,7 @@ services: environment: server.port: 8080 spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.producer.bootstrap-servers: kafka:9092 spring.kafak.client-id: adder-1 spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms @@ -144,6 +148,7 @@ services: environment: server.port: 8080 spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.producer.bootstrap-servers: kafka:9092 spring.kafak.client-id: adder-2 spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c09eec3..b5f6187 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,13 +1,25 @@ package de.juplo.kafka; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Map; import java.util.Optional; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.backoff.FixedBackOff; @Configuration @@ -58,4 +70,41 @@ public class ApplicationConfiguration endpointRegistry, recordHandler); } + + @Bean + public ProducerFactory producerFactory( + KafkaProperties properties) + { + return new DefaultKafkaProducerFactory<>( + properties.getProducer().buildProperties(), + new StringSerializer(), + new DelegatingByTypeSerializer( + Map.of( + byte[].class, new ByteArraySerializer(), + MessageAddNumber.class, new JsonSerializer<>(), + MessageCalculateSum.class, new JsonSerializer<>()))); + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( + KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler( + DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + new FixedBackOff(0l, 0l)); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 92f3a6b..0bc592c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -34,10 +34,17 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum + producer: + bootstrap-servers: :9092 + properties: spring.json.type.mapping: > ADD:de.juplo.kafka.MessageAddNumber, CALC:de.juplo.kafka.MessageCalculateSum diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4793d96..b98066f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -124,32 +124,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(true, false, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedMessages) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should have exited") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -159,28 +156,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(false, true, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should not be running") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } -- 2.20.1