* Diese Version ist über die `application.yml` konfiguriert.
* Über die `application.yml` kann keine Konfiguration erreicht werden, die
sowolh die fachlichen Fehler nach erfolgter JSON-Deserialisierung, als
auch die Poison Pills korrekt umleiten kann.
echo 'BOOM!' | kafkacat -P -b :9092 -t test
# end::poisonpill[]
+echo "Writing logic error..."
+# tag::logicerror[]
+echo 66 | http -v :8080/peter?error=1
+# end::logicerror[]
+
echo 66 | http -v :8080/peter
echo 7 | http -v :8080/klaus
sleep 5
docker-compose stop consumer-1 consumer-2
docker-compose logs -f consumer-1 consumer-2
+# tag::kafkacat[]
+kafkacat -b :9092 -t test.DLT -e -f 'p=%p|o=%o|%k=%s|h=%h\n'
+# end::kafkacat[]
command: >
bash -c "
kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test.DLT
kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --topic test.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic test.DLT
"
depends_on:
- kafka-1
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
@SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
public class Application
{
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+ KafkaOperations<?, ?> kafkaTemplate)
+ {
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(
+ DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(
+ recoverer,
+ new FixedBackOff(0l, 0l));
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
spring.json.type.mapping: >
ADD:de.juplo.kafka.MessageAddNumber,
CALC:de.juplo.kafka.MessageCalculateSum
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging:
level:
root: INFO
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import static de.juplo.kafka.ApplicationIT.DLT;
import static de.juplo.kafka.ApplicationIT.TOPIC;
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"simple.consumer.topic=" + TOPIC })
-@EmbeddedKafka(topics = TOPIC)
+@EmbeddedKafka(topics = { TOPIC, DLT })
public class ApplicationIT
{
public static final String TOPIC = "FOO";
+ public static final String DLT = TOPIC + ".DLT";
@LocalServerPort
private int port;