echo -n Bereits konfiguriert:
cat INITIALIZED
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt
else
kafka-topics --bootstrap-server kafka:9092 \
--delete \
--config min.insync.replicas=2 \
&& echo Das Topic \'test\' wurde erfolgreich angelegt: \
&& kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
+ && kafka-topics --bootstrap-server kafka:9092 \
+ --delete \
+ --if-exists \
+ --topic test-dlt
+ kafka-topics --bootstrap-server kafka:9092 \
+ --create \
+ --topic test-dlt \
+ --partitions 2 \
+ --replication-factor 3 \
+ --config min.insync.replicas=2 \
+ && echo Das Topic \'test-dlt\' wurde erfolgreich angelegt: \
+ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test-dlt \
&& date > INITIALIZED
fi
stop_grace_period: 0s
juplo.producer.topic: test
consumer:
- image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: peter
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-dlt-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: ute
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-messageconverter-SNAPSHOT</version>
+ <version>1.1-messageconverter-dlt-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.listener.SeekUtils;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
return converter;
}
+
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaOperations<?, ?> kafkaTemplate)
+ {
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(
+ recoverer,
+ SeekUtils.DEFAULT_BACK_OFF);
+ }
}