From: Kai Moritz Date: Sat, 23 Apr 2022 08:11:17 +0000 (+0200) Subject: Springify: DLQ für Poison Pills konfiguriert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=37fce16f77b3833de727060137688ff176d9a577;p=demos%2Fkafka%2Ftraining Springify: DLQ für Poison Pills konfiguriert * Den Producer so konfiguriert, dass er einenen `ByteArraySerializer` verwendet. * Dafür wird der `DefaultErrorHandler` explizit konfiguriert und mit einem `DeadLetterPublishingRecoverer` konfiguriert. * Der `DefaultErrorHandler` ist mit einer `FixedBackOff`-Strategie konfiguriert, die die Nachricht direkt nach dem ersten Fehler auf das DLQ-Topic umleitet. * Damit der Producer die als `byte[]` übergebene fehlerhafte Nachricht serialisieren kann, wurde er mit einem `ByteArraySerializer` konfiguriert. * *TODO:* Auch Exceptions, die in dem `MessageListener` also dem `EndlessConsumer` geworfen werden, werden von dem im `DefaultErrorHandler` konfigurierten `DeadLetterPublishingRecoverer` in die DLQ geschickt, Problem dabei: ** Der Producer kann diese so wie hier konfiguriert noch nicht serialisieren. ** Für diese Fehler wäre aber auch eh eigentlich das Stop-World-Verhalten zu bevorzugen. --- diff --git a/docker-compose.yml b/docker-compose.yml index b03bb1e..317701f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,6 +30,8 @@ services: bash -c " kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic dlq + kafka-topics --bootstrap-server kafka:9092 --create --topic dlq --partitions 2 " cli: diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 26b81cb..6ab716e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,11 +1,16 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; import java.util.function.Consumer; @@ -24,9 +29,19 @@ public class ApplicationConfiguration } @Bean - public ApplicationErrorHandler errorHandler() + public DeadLetterPublishingRecoverer recoverer( + ApplicationProperties properties, + KafkaOperations template) { - return new ApplicationErrorHandler(); + return new DeadLetterPublishingRecoverer( + template, + (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition())); + } + + @Bean + public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l)); } @Bean(destroyMethod = "close") diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java deleted file mode 100644 index 273f509..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; -import org.springframework.kafka.listener.MessageListenerContainer; - -import java.util.List; -import java.util.Optional; - - -public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler -{ - private Exception exception; - - - public synchronized Optional getException() - { - return Optional.ofNullable(exception); - } - - public synchronized void clearException() - { - this.exception = null; - } - - - @Override - public void handleOtherException( - Exception thrownException, Consumer consumer, - MessageListenerContainer container, - boolean batchListener) - { - this.exception = thrownException; - super.handleOtherException(thrownException, consumer, container, batchListener); - } - - @Override - public void handleRemaining( - Exception thrownException, - List> records, - Consumer consumer, - MessageListenerContainer container) - { - this.exception = thrownException; - super.handleRemaining(thrownException, records, consumer, container); - } - - @Override - public void handleBatch( - Exception thrownException, - ConsumerRecords data, - Consumer consumer, - MessageListenerContainer container, - Runnable invokeListener) - { - this.exception = thrownException; - super.handleBatch(thrownException, data, consumer, container, invokeListener); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26e..742550e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -16,17 +16,6 @@ public class ApplicationHealthIndicator implements HealthIndicator @Override public Health health() { - try - { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); - } - catch (IllegalStateException e) - { - return Health.up().build(); - } + return consumer.isRunning() ? Health.up().build() : Health.down().build(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c7c4f78..511d68d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -18,4 +18,7 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; + @NotNull + @NotEmpty + private String dlqTopic; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 480e7d1..8bc3115 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -10,7 +10,6 @@ import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ExecutionException; @RestController diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 6d0c69d..04a0a3a 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -14,7 +14,6 @@ import org.springframework.stereotype.Component; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.function.Consumer; @@ -29,8 +28,6 @@ public class EndlessConsumer implements ConsumerAwareRebalanceListener String id; @Autowired Consumer> handler; - @Autowired - ApplicationErrorHandler errorHandler; private long consumed = 0; @@ -113,7 +110,6 @@ public class EndlessConsumer implements ConsumerAwareRebalanceListener throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); - errorHandler.clearException(); registry.getListenerContainer(id).start(); } @@ -127,11 +123,8 @@ public class EndlessConsumer implements ConsumerAwareRebalanceListener log.info("{} - Stopped - consumed {} messages so far", id, consumed); } - public synchronized Optional exitStatus() + public synchronized boolean isRunning() { - if (registry.getListenerContainer(id).isChildRunning()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); + return registry.getListenerContainer(id).isChildRunning(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 43cde87..4b22bd2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,6 @@ consumer: topic: test + dlq-topic: dlq management: endpoint: shutdown: @@ -27,10 +28,15 @@ spring: client-id: DEV auto-offset-reset: earliest group-id: my-group - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: + spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer" spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage" spring.json.trusted.packages: "de.juplo.kafka" + producer: + bootstrap-servers: :9092 + client-id: DEV + value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3f6a6a8..52ecc0f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -18,6 +18,7 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Primary; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -48,6 +49,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j @@ -91,7 +93,7 @@ class ApplicationTests await("100 records received") .atMost(Duration.ofSeconds(30)) - .until(() -> receivedRecords.size() >= 100); + .until(() -> receivedRecords.size() == 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -101,35 +103,42 @@ class ApplicationTests compareToCommitedOffsets(newOffsets); }); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); + assertThat(endlessConsumer.isRunning()) + .describedAs("Consumer should still be running") + .isTrue(); } @Test @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsCurrentOffsetsOnError() { send100Messages((key, counter) -> counter == 77 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) : serialize(key, counter)); - await("Consumer failed") + await("99 records received") .atMost(Duration.ofSeconds(30)) - .untilAsserted(() -> checkSeenOffsetsForProgress()); - - compareToCommitedOffsets(newOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .containsInstanceOf(RecordDeserializationException.class) - .describedAs("Consumer should have exited abnormally"); + .until(() -> receivedRecords.size() == 99); + + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> + { + // UNSCHÖN: + // Funktioniert nur, weil nach der Nachrichten, die den + // Deserialisierungs-Fehler auslöst noch valide Nachrichten + // gelesen werden. + // GRUND: + // Der MessageHandler sieht den Offset der Fehlerhaften + // Nachricht nicht! + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + }); + + assertThat(endlessConsumer.isRunning()) + .describedAs("Consumer should still be running") + .isTrue(); }