* Der `DeadLetterPublishingRecoverer` muss explizit instanziiert werden.
* Um ihm den Spring-Kafka-Beans bekannt zu machen, muss die
`DefaultErrorHandler`-Bean überschrieben werden.
* Der Recoverer wird dem Handler zusammen mit einer BackOff-Strategie
übergeben.
* Damit der `DeadLetterPublishingRecoverer` die weiterzuleitenden
Nachrichten senden kann, muss
* Der Producer benötigt scheinbar einen separaten Eintrag für
`bootstrap-servers` unter `spring.kafka.producer`. Der Eintrag
`spring.kafa.bootstrap-servers` wird hier nicht übernommen!
bash -c "
kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT
kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+ kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2
kafka-topics --bootstrap-server kafka:9092 --describe --topic in
kafka-topics --bootstrap-server kafka:9092 --describe --topic out
+ kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT
"
cli:
environment:
server.port: 8080
spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.producer.bootstrap-servers: kafka:9092
spring.kafak.client-id: adder-1
spring.kafka.auto-commit-interval: 1s
sumup.adder.throttle: 3ms
environment:
server.port: 8080
spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.producer.bootstrap-servers: kafka:9092
spring.kafak.client-id: adder-2
spring.kafka.auto-commit-interval: 1s
sumup.adder.throttle: 3ms
package de.juplo.kafka;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.util.Map;
import java.util.Optional;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
@Configuration
endpointRegistry,
recordHandler);
}
+
+ @Bean
+ public ProducerFactory<String, Object> producerFactory(
+ KafkaProperties properties)
+ {
+ return new DefaultKafkaProducerFactory<>(
+ properties.getProducer().buildProperties(),
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(
+ Map.of(
+ byte[].class, new ByteArraySerializer(),
+ MessageAddNumber.class, new JsonSerializer<>(),
+ MessageCalculateSum.class, new JsonSerializer<>())));
+ }
+
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory)
+ {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+ KafkaOperations<?, ?> kafkaTemplate)
+ {
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(
+ DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(
+ recoverer,
+ new FixedBackOff(0l, 0l));
+ }
}
auto-offset-reset: earliest
auto-commit-interval: 5s
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
metadata.max.age.ms: 1000
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+ spring.json.type.mapping: >
+ ADD:de.juplo.kafka.MessageAddNumber,
+ CALC:de.juplo.kafka.MessageCalculateSum
+ producer:
+ bootstrap-servers: :9092
+ properties:
spring.json.type.mapping: >
ADD:de.juplo.kafka.MessageAddNumber,
CALC:de.juplo.kafka.MessageCalculateSum
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
@TestPropertySource(
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"sumup.adder.topic=" + TOPIC,
"spring.kafka.consumer.auto-commit-interval=500ms",
"spring.mongodb.embedded.version=4.4.13" })
{
recordGenerator.generate(true, false, messageSender);
- int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+ int numberOfValidMessages =
+ recordGenerator.getNumberOfMessages() -
+ recordGenerator.getNumberOfPoisonPills();
- await("Consumer failed")
+ await(numberOfValidMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
- assertThat(recordHandler.receivedMessages)
- .describedAs("Received not all sent events")
- .isLessThan(numberOfGeneratedMessages);
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ });
assertThat(endlessConsumer.running())
- .describedAs("Consumer should have exited")
- .isFalse();
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}
{
recordGenerator.generate(false, true, messageSender);
- int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+ int numberOfValidMessages =
+ recordGenerator.getNumberOfMessages() -
+ recordGenerator.getNumberOfLogicErrors();
- await("Consumer failed")
+ await(numberOfValidMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ });
assertThat(endlessConsumer.running())
- .describedAs("Consumer should not be running")
- .isFalse();
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}