* Diese Version ist kann fachliche Fehler umleiten.
* Für Deserialisierungs-Fehler funktioniert die Umleitung
noch nicht!
#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.1-kafkalistener-long-deserialization-error-SNAPSHOT'
+version = '1.1-kafkalistener-long-dlt-SNAPSHOT'
java {
toolchain {
echo -n Bereits konfiguriert:
cat INITIALIZED
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt
else
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--config min.insync.replicas=2 \
&& echo Das Topic \'test\' wurde erfolgreich angelegt: \
&& kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+ && kafka-topics --bootstrap-server kafka:9092 \
+ --delete \
+ --if-exists \
+ --topic test-dlt
+ kafka-topics --bootstrap-server kafka:9092 \
+ --create \
+ --topic test-dlt \
+ --partitions 2 \
+ --replication-factor 3 \
+ --config min.insync.replicas=2 \
+ && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \
+ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \
&& date > INITIALIZED
fi
stop_grace_period: 0s
juplo.producer.topic: test
consumer:
- image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+ image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+ image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: peter
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+ image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: ute
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-kafkalistener-long-deserialization-error-SNAPSHOT</version>
+ <version>1.1-kafkalistener-long-dlt-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
+@EnableConfigurationProperties(KafkaProperties.class)
public class Application
{
public static void main(String[] args)
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.listener.SeekUtils;
+
+
+@Configuration
+public class ApplicationConfiguration
+{
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations<?, ?> kafkaTemplate)
+ {
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(
+ recoverer,
+ SeekUtils.DEFAULT_BACK_OFF);
+ }
+}
String key,
Long value)
{
+ if (value % 66 == 0) throw new RuntimeException("BOOM: Fachlicher Fehler!");
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}
group-id: my-group
properties:
"[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer
+ producer:
+ value-serializer: org.apache.kafka.common.serialization.LongSerializer
logging:
level:
root: INFO
import java.time.Duration;
import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.DLT;
import static de.juplo.kafka.ApplicationTests.TOPIC;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.consumer.topic=" + TOPIC })
@AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EmbeddedKafka(topics = { TOPIC, DLT }, partitions = PARTITIONS)
public class ApplicationTests
{
static final String TOPIC = "FOO";
+ public static final String DLT = TOPIC + "-dlt";
static final int PARTITIONS = 10;
@Autowired