]> juplo.de Git - demos/kafka/training/commitdiff
Version des `spring-consumer` mit einem Dead-Letter-Topic
authorKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 14:40:35 +0000 (15:40 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 20 Mar 2026 20:05:36 +0000 (21:05 +0100)
* Diese Version ist kann fachliche Fehler umleiten.
* Für Deserialisierungs-Fehler funktioniert die Umleitung
  noch nicht!

README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index ba1f88e323f875c89271cc2db23da8c0a291a3ae..8d028797073adbe814f85278ae54a9a07575691f 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index 33d144893eedc38ea6cde0aca5b9e0be53a4dcb3..99594abff1c188fb0110e0d69c3ea11bed48389d 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.1-kafkalistener-long-deserialization-error-SNAPSHOT'
+version = '1.1-kafkalistener-long-dlt-SNAPSHOT'
 
 java {
        toolchain {
index e570b6bd7670035c72f3922baec213c69d7a11a0..bfe2225511423b0ef43551ac6ae78c8dccbddb21 100644 (file)
@@ -129,6 +129,7 @@ services:
           echo -n Bereits konfiguriert: 
           cat INITIALIZED
           kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+          kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt
         else
           kafka-topics --bootstrap-server kafka:9092 \
                        --delete \
@@ -142,6 +143,18 @@ services:
                        --config min.insync.replicas=2 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+          && kafka-topics --bootstrap-server kafka:9092 \
+                       --delete \
+                       --if-exists \
+                       --topic test-dlt
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --create \
+                       --topic test-dlt \
+                       --partitions 2 \
+                       --replication-factor 3 \
+                       --config min.insync.replicas=2 \
+          && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \
+          && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \
           && date > INITIALIZED
         fi
     stop_grace_period: 0s
@@ -183,7 +196,7 @@ services:
     mem_limit:  100m
 
   consumer:
-    image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+    image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
@@ -192,7 +205,7 @@ services:
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+    image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: peter
@@ -201,7 +214,7 @@ services:
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-kafkalistener-long-deserialization-error-SNAPSHOT
+    image: juplo/spring-consumer:1.1-kafkalistener-long-dlt-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: ute
diff --git a/pom.xml b/pom.xml
index 91d4304b001f2fffd78a17b3d951dc3276a6c2fc..004d14f58fb1de6214849b96ffd2d9e2e7332023 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-kafkalistener-long-deserialization-error-SNAPSHOT</version>
+  <version>1.1-kafkalistener-long-dlt-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index 0069257fe27025546c1f89a8124c9114ac88f830..560de603ebb3f5e7872924bf5b10994ca883a6d4 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 
 
 @SpringBootApplication
+@EnableConfigurationProperties(KafkaProperties.class)
 public class Application
 {
   public static void main(String[] args)
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..0f18338
--- /dev/null
@@ -0,0 +1,27 @@
+package de.juplo.kafka;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.listener.SeekUtils;
+
+
+@Configuration
+public class ApplicationConfiguration
+{
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations<?, ?> kafkaTemplate)
+  {
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(
+      recoverer,
+      SeekUtils.DEFAULT_BACK_OFF);
+  }
+}
index 6e2b7603fbb7a406b0d7448cd77d682d28f6ead7..30dc025d930c19b463b1416430cd3c65a2caaf10 100644 (file)
@@ -41,6 +41,7 @@ public class ExampleConsumer
     String key,
     Long value)
   {
+    if (value % 66 == 0) throw new RuntimeException("BOOM: Fachlicher Fehler!");
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }
index e248b958d08674e2cd6fc9b37836bd85b9e33820..4a8efd3431067b1cc46847790c63f30aeebdb8b4 100644 (file)
@@ -30,6 +30,8 @@ spring:
       group-id: my-group
       properties:
         "[spring.deserializer.value.delegate.class]": org.apache.kafka.common.serialization.LongDeserializer
+    producer:
+      value-serializer: org.apache.kafka.common.serialization.LongSerializer
 logging:
   level:
     root: INFO
index 61197cbf1b82dc55d9c3123ab9a4ad38091b2e59..ae3eec88971a0699011480941572e126fc08d5c7 100644 (file)
@@ -10,6 +10,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import java.time.Duration;
 
 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.DLT;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
@@ -22,10 +23,11 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "juplo.consumer.topic=" + TOPIC })
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EmbeddedKafka(topics = { TOPIC, DLT }, partitions = PARTITIONS)
 public class ApplicationTests
 {
   static final String TOPIC = "FOO";
+  public static final String DLT = TOPIC + "-dlt";
   static final int PARTITIONS = 10;
 
   @Autowired