bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
         kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic dlq
+        kafka-topics --bootstrap-server kafka:9092 --create --topic dlq --partitions 2
       "
 
   cli:
 
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
 
 import java.util.function.Consumer;
 
   }
 
   @Bean
-  public ApplicationErrorHandler errorHandler()
+  public DeadLetterPublishingRecoverer recoverer(
+      ApplicationProperties properties,
+      KafkaOperations<?, ?> template)
   {
-    return new ApplicationErrorHandler();
+    return new DeadLetterPublishingRecoverer(
+        template,
+        (record, exception) -> new TopicPartition(properties.getDlqTopic(), record.partition()));
+  }
+
+  @Bean
+  public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer)
+  {
+    return new DefaultErrorHandler(recoverer, new FixedBackOff(0l, 0l));
   }
 
   @Bean(destroyMethod = "close")
 
+++ /dev/null
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.listener.MessageListenerContainer;
-
-import java.util.List;
-import java.util.Optional;
-
-
-public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
-{
-  private Exception exception;
-
-
-  public synchronized Optional<Exception> getException()
-  {
-    return Optional.ofNullable(exception);
-  }
-
-  public synchronized void clearException()
-  {
-    this.exception = null;
-  }
-
-
-  @Override
-  public void handleOtherException(
-      Exception thrownException, Consumer<?, ?> consumer,
-      MessageListenerContainer container,
-      boolean batchListener)
-  {
-    this.exception = thrownException;
-    super.handleOtherException(thrownException, consumer, container, batchListener);
-  }
-
-  @Override
-  public void handleRemaining(
-      Exception thrownException,
-      List<ConsumerRecord<?, ?>> records,
-      Consumer<?, ?> consumer,
-      MessageListenerContainer container)
-  {
-    this.exception = thrownException;
-    super.handleRemaining(thrownException, records, consumer, container);
-  }
-
-  @Override
-  public void handleBatch(
-      Exception thrownException,
-      ConsumerRecords<?, ?> data,
-      Consumer<?, ?> consumer,
-      MessageListenerContainer container,
-      Runnable invokeListener)
-  {
-    this.exception = thrownException;
-    super.handleBatch(thrownException, data, consumer, container, invokeListener);
-  }
-}
 
   @Override
   public Health health()
   {
-    try
-    {
-      return consumer
-          .exitStatus()
-          .map(Health::down)
-          .orElse(Health.outOfService())
-          .build();
-    }
-    catch (IllegalStateException e)
-    {
-      return Health.up().build();
-    }
+    return consumer.isRunning() ? Health.up().build() : Health.down().build();
   }
 }
 
   @NotNull
   @NotEmpty
   private String topic;
+  @NotNull
+  @NotEmpty
+  private String dlqTopic;
 }
 
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 
 
 @RestController
 
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Consumer;
 
 
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
-  @Autowired
-  ApplicationErrorHandler errorHandler;
 
   private long consumed = 0;
 
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
-    errorHandler.clearException();
     registry.getListenerContainer(id).start();
   }
 
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  public synchronized Optional<Exception> exitStatus()
+  public synchronized boolean isRunning()
   {
-    if (registry.getListenerContainer(id).isChildRunning())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
+    return registry.getListenerContainer(id).isChildRunning();
   }
 }
 
 consumer:
   topic: test
+  dlq-topic: dlq
 management:
   endpoint:
     shutdown:
       client-id: DEV
       auto-offset-reset: earliest
       group-id: my-group
-      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
+        spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
         spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
         spring.json.trusted.packages: "de.juplo.kafka"
+    producer:
+      bootstrap-servers: :9092
+      client-id: DEV
+      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
 logging:
   level:
     root: INFO
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.MessageListenerContainer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 @TestPropertySource(
                properties = {
                                "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
-                               .until(() -> receivedRecords.size() >= 100);
+                               .until(() -> receivedRecords.size() == 100);
 
                await("Offsets committed")
                                .atMost(Duration.ofSeconds(10))
                                        compareToCommitedOffsets(newOffsets);
                                });
 
-               assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
-                               .describedAs("Consumer should still be running");
+               assertThat(endlessConsumer.isRunning())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
        }
 
        @Test
        @Order(2)
-       void commitsOffsetOfErrorForReprocessingOnError()
+       void commitsCurrentOffsetsOnError()
        {
                send100Messages((key, counter) ->
                                counter == 77
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
                                                : serialize(key, counter));
 
-               await("Consumer failed")
+               await("99 records received")
                                .atMost(Duration.ofSeconds(30))
-                               .untilAsserted(() -> checkSeenOffsetsForProgress());
-
-               compareToCommitedOffsets(newOffsets);
-               assertThat(receivedRecords.size())
-                               .describedAs("Received not all sent events")
-                               .isLessThan(100);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .containsInstanceOf(RecordDeserializationException.class)
-                               .describedAs("Consumer should have exited abnormally");
+                               .until(() -> receivedRecords.size() == 99);
+
+               await("Offsets committed")
+                               .atMost(Duration.ofSeconds(10))
+                               .untilAsserted(() ->
+                               {
+                                       // UNSCHÖN:
+                                       // Funktioniert nur, weil nach der Nachrichten, die den
+                                       // Deserialisierungs-Fehler auslöst noch valide Nachrichten
+                                       // gelesen werden.
+                                       // GRUND:
+                                       // Der MessageHandler sieht den Offset der Fehlerhaften
+                                       // Nachricht nicht!
+                                       checkSeenOffsetsForProgress();
+                                       compareToCommitedOffsets(newOffsets);
+                               });
+
+               assertThat(endlessConsumer.isRunning())
+                               .describedAs("Consumer should still be running")
+                               .isTrue();
        }