From: Kai Moritz Date: Sun, 20 Nov 2022 15:26:06 +0000 (+0100) Subject: DLT-Konfig für `spring-consumer`, die auch mit Poison Pills umgehen kann X-Git-Tag: spring-consumer--json--adder--kafkalistener--dlt---lvm-2-tage--easy-path X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2eb00a6cce8be4588119915ed9623fbd1bab39d7;p=demos%2Fkafka%2Ftraining DLT-Konfig für `spring-consumer`, die auch mit Poison Pills umgehen kann * Damit der Producer, der die Nachrichten in das Dead-Letter-Topic schreibt, sowohl mit Deserialisierten Nachriten umgehen kann, die über den JsonSerializer zu serialisieren sind, als auch mit Poison Pills, die unverändert als `byte[]` zu schreiben sind, muss ein `DelegatingByTypeSerializer` konfiguriert werden. * Dieser erwartet das Mapping im Konstruktor als Map und kann daher nicht (oder nicht so leicht?) über die `application.yml` konfiguriert werden! --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index f26d7a2..0360f68 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,20 +1,53 @@ package de.juplo.kafka; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.backoff.FixedBackOff; +import java.util.Map; + @SpringBootApplication @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class Application { + @Bean + public ProducerFactory producerFactory( + KafkaProperties properties) + { + Map map = Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); + return new DefaultKafkaProducerFactory<>( + map, + new StringSerializer(), + new DelegatingByTypeSerializer( + Map.of( + byte[].class, new ByteArraySerializer(), + MessageAddNumber.class, new JsonSerializer<>(), + MessageCalculateSum.class, new JsonSerializer<>()))); + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } + @Bean public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( KafkaOperations kafkaTemplate) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 61a3f85..17e94ad 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -38,9 +38,6 @@ spring: spring.json.type.mapping: > ADD:de.juplo.kafka.MessageAddNumber, CALC:de.juplo.kafka.MessageCalculateSum - producer: - key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer logging: level: root: INFO