bash -c "
kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+ kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic dlq
+ kafka-topics --bootstrap-server kafka:9092 --create --topic dlq --partitions 2
"
cli:
package de.juplo.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
import java.util.function.Consumer;
}
@Bean
- public ApplicationErrorHandler errorHandler()
+ public DeadLetterPublishingRecoverer recoverer(
+ ApplicationProperties properties,
+ KafkaOperations<?, ?> template)
{
- return new ApplicationErrorHandler();
+ return new DeadLetterPublishingRecoverer(
+ template,
+ (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+ }
+
+ @Bean
+ public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+ {
+ return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
}
@Bean(destroyMethod = "close")
+++ /dev/null
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.listener.MessageListenerContainer;
-
-import java.util.List;
-import java.util.Optional;
-
-
-public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
-{
- private Exception exception;
-
-
- public synchronized Optional<Exception> getException()
- {
- return Optional.ofNullable(exception);
- }
-
- public synchronized void clearException()
- {
- this.exception = null;
- }
-
-
- @Override
- public void handleOtherException(
- Exception thrownException, Consumer<?, ?> consumer,
- MessageListenerContainer container,
- boolean batchListener)
- {
- this.exception = thrownException;
- super.handleOtherException(thrownException, consumer, container, batchListener);
- }
-
- @Override
- public void handleRemaining(
- Exception thrownException,
- List<ConsumerRecord<?, ?>> records,
- Consumer<?, ?> consumer,
- MessageListenerContainer container)
- {
- this.exception = thrownException;
- super.handleRemaining(thrownException, records, consumer, container);
- }
-
- @Override
- public void handleBatch(
- Exception thrownException,
- ConsumerRecords<?, ?> data,
- Consumer<?, ?> consumer,
- MessageListenerContainer container,
- Runnable invokeListener)
- {
- this.exception = thrownException;
- super.handleBatch(thrownException, data, consumer, container, invokeListener);
- }
-}
@Override
public Health health()
{
- try
- {
- return consumer
- .exitStatus()
- .map(Health::down)
- .orElse(Health.outOfService())
- .build();
- }
- catch (IllegalStateException e)
- {
- return Health.up().build();
- }
+ return consumer.isRunning() ? Health.up().build() : Health.down().build();
}
}
@NotNull
@NotEmpty
private String topic;
+ @NotNull
+ @NotEmpty
+ private String dlqTopic;
}
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
@RestController
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Consumer;
String id;
@Autowired
Consumer<ConsumerRecord<K, V>> handler;
- @Autowired
- ApplicationErrorHandler errorHandler;
private long consumed = 0;
throw new IllegalStateException("Consumer instance " + id + " is already running!");
log.info("{} - Starting - consumed {} messages before", id, consumed);
- errorHandler.clearException();
registry.getListenerContainer(id).start();
}
log.info("{} - Stopped - consumed {} messages so far", id, consumed);
}
- public synchronized Optional<Exception> exitStatus()
+ public synchronized boolean isRunning()
{
- if (registry.getListenerContainer(id).isChildRunning())
- throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
- return errorHandler.getException();
+ return registry.getListenerContainer(id).isChildRunning();
}
}
consumer:
topic: test
+ dlq-topic: dlq
management:
endpoint:
shutdown:
client-id: DEV
auto-offset-reset: earliest
group-id: my-group
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
+ spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
spring.json.trusted.packages: "de.juplo.kafka"
+ producer:
+ bootstrap-servers: :9092
+ client-id: DEV
+ value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
logging:
level:
root: INFO
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
@TestPropertySource(
properties = {
"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"consumer.topic=" + TOPIC })
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@Slf4j
await("100 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() >= 100);
+ .until(() -> receivedRecords.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
compareToCommitedOffsets(newOffsets);
});
- assertThatExceptionOfType(IllegalStateException.class)
- .isThrownBy(() -> endlessConsumer.exitStatus())
- .describedAs("Consumer should still be running");
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
}
@Test
@Order(2)
- void commitsOffsetOfErrorForReprocessingOnError()
+ void commitsCurrentOffsetsOnError()
{
send100Messages((key, counter) ->
counter == 77
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: serialize(key, counter));
- await("Consumer failed")
+ await("99 records received")
.atMost(Duration.ofSeconds(30))
- .untilAsserted(() -> checkSeenOffsetsForProgress());
-
- compareToCommitedOffsets(newOffsets);
- assertThat(receivedRecords.size())
- .describedAs("Received not all sent events")
- .isLessThan(100);
-
- assertThatNoException()
- .describedAs("Consumer should not be running")
- .isThrownBy(() -> endlessConsumer.exitStatus());
- assertThat(endlessConsumer.exitStatus())
- .containsInstanceOf(RecordDeserializationException.class)
- .describedAs("Consumer should have exited abnormally");
+ .until(() -> receivedRecords.size() == 99);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ // UNSCHÖN:
+ // Funktioniert nur, weil nach der Nachrichten, die den
+ // Deserialisierungs-Fehler auslöst noch valide Nachrichten
+ // gelesen werden.
+ // GRUND:
+ // Der MessageHandler sieht den Offset der Fehlerhaften
+ // Nachricht nicht!
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThat(endlessConsumer.isRunning())
+ .describedAs("Consumer should still be running")
+ .isTrue();
}