Springifiy: Merge und Wiederbelebung des Rebalance-Listeners zur Zählung
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 15:59:18 +0000 (17:59 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 16:09:47 +0000 (18:09 +0200)
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ClientMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/resources/application.yml
src/main/resources/logback.xml
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index f218085..21466ec 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
index 6601e6d..f227bbe 100644 (file)
@@ -7,11 +7,6 @@ import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
-import javax.annotation.PreDestroy;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
 
 @SpringBootApplication
 @Slf4j
@@ -19,8 +14,6 @@ public class Application implements ApplicationRunner
 {
   @Autowired
   EndlessConsumer endlessConsumer;
-  @Autowired
-  ExecutorService executor;
 
 
   @Override
@@ -30,33 +23,6 @@ public class Application implements ApplicationRunner
     endlessConsumer.start();
   }
 
-  @PreDestroy
-  public void stopExecutor()
-  {
-    try
-    {
-      log.info("Shutting down the ExecutorService.");
-      executor.shutdown();
-      log.info("Waiting 5 seconds for the ExecutorService to terminate...");
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-    catch (InterruptedException e)
-    {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
-    }
-    finally
-    {
-      if (!executor.isShutdown())
-      {
-        log.warn("Forcing shutdown of ExecutorService!");
-        executor
-            .shutdownNow()
-            .forEach(runnable -> log.warn("Unfinished task: {}", runnable.getClass().getSimpleName()));
-      }
-      log.info("Shutdow of ExecutorService finished");
-    }
-  }
-
 
   public static void main(String[] args)
   {
index 4054e93..26b81cb 100644 (file)
@@ -1,25 +1,21 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.ConsumerFactory;
 
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 
 @Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, Long>> consumer()
+  public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
   {
     return (record) ->
     {
@@ -28,40 +24,14 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public EndlessConsumer<String, Long> endlessConsumer(
-      KafkaConsumer<String, Long> kafkaConsumer,
-      ExecutorService executor,
-      Consumer<ConsumerRecord<String, Long>> handler,
-      ApplicationProperties properties)
+  public ApplicationErrorHandler errorHandler()
   {
-    return
-        new EndlessConsumer<>(
-            executor,
-            properties.getClientId(),
-            properties.getTopic(),
-            kafkaConsumer,
-            handler);
-  }
-
-  @Bean
-  public ExecutorService executor()
-  {
-    return Executors.newSingleThreadExecutor();
+    return new ApplicationErrorHandler();
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+  public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
   {
-    Properties props = new Properties();
-
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("group.id", properties.getGroupId());
-    props.put("client.id", properties.getClientId());
-    props.put("auto.offset.reset", properties.getAutoOffsetReset());
-    props.put("metadata.max.age.ms", "1000");
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", LongDeserializer.class.getName());
-
-    return new KafkaConsumer<>(props);
+    return factory.createConsumer();
   }
 }
diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
new file mode 100644 (file)
index 0000000..273f509
--- /dev/null
@@ -0,0 +1,61 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
+import org.springframework.kafka.listener.MessageListenerContainer;
+
+import java.util.List;
+import java.util.Optional;
+
+
+public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
+{
+  private Exception exception;
+
+
+  public synchronized Optional<Exception> getException()
+  {
+    return Optional.ofNullable(exception);
+  }
+
+  public synchronized void clearException()
+  {
+    this.exception = null;
+  }
+
+
+  @Override
+  public void handleOtherException(
+      Exception thrownException, Consumer<?, ?> consumer,
+      MessageListenerContainer container,
+      boolean batchListener)
+  {
+    this.exception = thrownException;
+    super.handleOtherException(thrownException, consumer, container, batchListener);
+  }
+
+  @Override
+  public void handleRemaining(
+      Exception thrownException,
+      List<ConsumerRecord<?, ?>> records,
+      Consumer<?, ?> consumer,
+      MessageListenerContainer container)
+  {
+    this.exception = thrownException;
+    super.handleRemaining(thrownException, records, consumer, container);
+  }
+
+  @Override
+  public void handleBatch(
+      Exception thrownException,
+      ConsumerRecords<?, ?> data,
+      Consumer<?, ?> consumer,
+      MessageListenerContainer container,
+      Runnable invokeListener)
+  {
+    this.exception = thrownException;
+    super.handleBatch(thrownException, data, consumer, container, invokeListener);
+  }
+}
index fa731c5..c7c4f78 100644 (file)
@@ -15,19 +15,7 @@ import javax.validation.constraints.NotNull;
 @Setter
 public class ApplicationProperties
 {
-  @NotNull
-  @NotEmpty
-  private String bootstrapServer;
-  @NotNull
-  @NotEmpty
-  private String groupId;
-  @NotNull
-  @NotEmpty
-  private String clientId;
   @NotNull
   @NotEmpty
   private String topic;
-  @NotNull
-  @NotEmpty
-  private String autoOffsetReset;
 }
diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java
new file mode 100644 (file)
index 0000000..d18800b
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class ClientMessage
+{
+  String client;
+  String message;
+}
index 93580ee..480e7d1 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -26,7 +27,7 @@ public class DriverController
   }
 
   @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
+  public void stop()
   {
     consumer.stop();
   }
@@ -34,7 +35,7 @@ public class DriverController
   @GetMapping("seen")
   public Map<Integer, Map<String, Long>> seen()
   {
-    return consumer.getSeen();
+    return new HashMap<>();
   }
 
   @ExceptionHandler
index 8802df9..6d0c69d 100644 (file)
@@ -2,35 +2,36 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
+import org.springframework.stereotype.Component;
 
-import javax.annotation.PreDestroy;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
 
 
+@Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
 {
-  private final ExecutorService executor;
-  private final String id;
-  private final String topic;
-  private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+  @Autowired
+  private KafkaListenerEndpointRegistry registry;
+  @Value("${spring.kafka.consumer.client-id}")
+  String id;
+  @Autowired
+  Consumer<ConsumerRecord<K, V>> handler;
+  @Autowired
+  ApplicationErrorHandler errorHandler;
 
-  private final Lock lock = new ReentrantLock();
-  private final Condition condition = lock.newCondition();
-  private boolean running = false;
-  private Exception exception;
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
@@ -38,7 +39,9 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
 
 
   @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  public void onPartitionsRevokedBeforeCommit(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
   {
     partitions.forEach(tp ->
     {
@@ -66,7 +69,9 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   }
 
   @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  public void onPartitionsAssigned(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
   {
     partitions.forEach(tp ->
     {
@@ -79,208 +84,54 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   }
 
 
-  @Override
-  public void run()
-  {
-    try
-    {
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
-
-      while (true)
-      {
-        ConsumerRecords<K, V> records =
-            consumer.poll(Duration.ofSeconds(1));
-
-        // Do something with the data...
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
-        {
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              record.offset(),
-              record.topic(),
-              record.partition(),
-              record.key(),
-              record.value()
-          );
-
-          handler.accept(record);
-
-          consumed++;
-
-          Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
-        }
-      }
-    }
-    catch(WakeupException e)
-    {
-      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
-      shutdown();
-    }
-    catch(RecordDeserializationException e)
-    {
-      TopicPartition tp = e.topicPartition();
-      long offset = e.offset();
-      log.error(
-          "{} - Could not deserialize  message on topic {} with offset={}: {}",
-          id,
-          tp,
-          offset,
-          e.getCause().toString());
-
-      consumer.commitSync();
-      shutdown(e);
-    }
-    catch(Exception e)
-    {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
-      shutdown(e);
-    }
-    finally
-    {
-      log.info("{} - Consumer-Thread exiting", id);
-    }
-  }
-
-  private void shutdown()
-  {
-    shutdown(null);
-  }
-
-  private void shutdown(Exception e)
-  {
-    lock.lock();
-    try
-    {
-      try
-      {
-        log.info("{} - Unsubscribing from topic {}", id, topic);
-        consumer.unsubscribe();
-      }
-      catch (Exception ue)
-      {
-        log.error(
-            "{} - Error while unsubscribing from topic {}: {}",
-            id,
-            topic,
-            ue.toString());
-      }
-      finally
-      {
-        running = false;
-        exception = e;
-        condition.signal();
-      }
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public Map<Integer, Map<String, Long>> getSeen()
+  @KafkaListener(
+      id = "${spring.kafka.consumer.client-id}",
+      idIsGroup = false,
+      topics = "${consumer.topic}",
+      autoStartup = "false")
+  public void receive(ConsumerRecord<K, V> record)
   {
-    return seen;
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
+
+    handler.accept(record);
+
+    consumed++;
   }
 
-  public void start()
-  {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-      log.info("{} - Starting - consumed {} messages before", id, consumed);
-      running = true;
-      exception = null;
-      executor.submit(this);
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
 
-  public synchronized void stop() throws ExecutionException, InterruptedException
+  public synchronized void start()
   {
-    lock.lock();
-    try
-    {
-      if (!running)
-        throw new IllegalStateException("Consumer instance " + id + " is not running!");
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
-      log.info("{} - Stopping", id);
-      consumer.wakeup();
-      condition.await();
-      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    errorHandler.clearException();
+    registry.getListenerContainer(id).start();
   }
 
-  @PreDestroy
-  public void destroy() throws ExecutionException, InterruptedException
+  public synchronized void stop()
   {
-    log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
-  }
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
 
-  public boolean running()
-  {
-    lock.lock();
-    try
-    {
-      return running;
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  public Optional<Exception> exitStatus()
+  public synchronized Optional<Exception> exitStatus()
   {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
 
-      return Optional.ofNullable(exception);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    return errorHandler.getException();
   }
 }
index 9f3cb81..43cde87 100644 (file)
@@ -1,9 +1,5 @@
 consumer:
-  bootstrap-server: :9092
-  group-id: my-group
-  client-id: DEV
   topic: test
-  auto-offset-reset: earliest
 management:
   endpoint:
     shutdown:
@@ -19,11 +15,22 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${consumer.bootstrap-server}
-    client-id: ${consumer.client-id}
-    group-id: ${consumer.group-id}
+    bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
+    client-id: ${spring.kafka.consumer.client-id}
+    group-id: ${spring.kafka.consumer.group-id}
     topic: ${consumer.topic}
-    auto-offset-reset: ${consumer.auto-offset-reset}
+    auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
+spring:
+  kafka:
+    consumer:
+      bootstrap-servers: :9092
+      client-id: DEV
+      auto-offset-reset: earliest
+      group-id: my-group
+      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+      properties:
+        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
+        spring.json.trusted.packages: "de.juplo.kafka"
 logging:
   level:
     root: INFO
index b8e6780..c41e588 100644 (file)
@@ -9,6 +9,7 @@
 
   <logger name="de.juplo" level="TRACE"/>
   <!-- logger name="org.apache.kafka.clients" level="DEBUG" / -->
+  <logger name="org.springframework.kafka" level="DEBUG"/>
 
   <root level="INFO">
     <appender-ref ref="STDOUT" />
index 40dc149..3f6a6a8 100644 (file)
@@ -11,10 +11,14 @@ import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -22,10 +26,9 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -35,11 +38,16 @@ import static org.assertj.core.api.Assertions.*;
 import static org.awaitility.Awaitility.*;
 
 
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@SpringJUnitConfig(
+               initializers = ConfigDataApplicationContextInitializer.class,
+               classes = {
+                               EndlessConsumer.class,
+                               KafkaAutoConfiguration.class,
+                               ApplicationTests.Configuration.class })
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestPropertySource(
                properties = {
-                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
@@ -56,19 +64,21 @@ class ApplicationTests
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
-       KafkaConsumer<String, Long> kafkaConsumer;
+       org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer;
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
-       ApplicationProperties properties;
+       ApplicationProperties applicationProperties;
        @Autowired
-       ExecutorService executor;
+       KafkaProperties kafkaProperties;
+       @Autowired
+       EndlessConsumer endlessConsumer;
+       @Autowired
+       RecordHandler recordHandler;
 
-       Consumer<ConsumerRecord<String, Long>> testHandler;
-       EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
-       Set<ConsumerRecord<String, Long>> receivedRecords;
+       Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
 
 
        /** Tests methods */
@@ -77,7 +87,7 @@ class ApplicationTests
        @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
        void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
        {
-               send100Messages(i ->  new Bytes(valueSerializer.serialize(TOPIC, i)));
+               send100Messages((key, counter) -> serialize(key, counter));
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -100,24 +110,15 @@ class ApplicationTests
        @Order(2)
        void commitsOffsetOfErrorForReprocessingOnError()
        {
-               send100Messages(counter ->
+               send100Messages((key, counter) ->
                                counter == 77
                                                ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
-                                               : new Bytes(valueSerializer.serialize(TOPIC, counter)));
-
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               compareToCommitedOffsets(newOffsets);
+                                               : serialize(key, counter));
 
-               endlessConsumer.start();
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
-                               .until(() -> !endlessConsumer.running());
+                               .untilAsserted(() -> checkSeenOffsetsForProgress());
 
-               checkSeenOffsetsForProgress();
                compareToCommitedOffsets(newOffsets);
                assertThat(receivedRecords.size())
                                .describedAs("Received not all sent events")
@@ -127,8 +128,8 @@ class ApplicationTests
                                .describedAs("Consumer should not be running")
                                .isThrownBy(() -> endlessConsumer.exitStatus());
                assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RecordDeserializationException.class);
+                               .containsInstanceOf(RecordDeserializationException.class)
+                               .describedAs("Consumer should have exited abnormally");
        }
 
 
@@ -185,7 +186,7 @@ class ApplicationTests
        }
 
 
-       void send100Messages(Function<Long, Bytes> messageGenerator)
+       void send100Messages(BiFunction<Integer, Long, Bytes> messageGenerator)
        {
                long i = 0;
 
@@ -193,7 +194,7 @@ class ApplicationTests
                {
                        for (int key = 0; key < 10; key++)
                        {
-                               Bytes value = messageGenerator.apply(++i);
+                               Bytes value = messageGenerator.apply(key, ++i);
 
                                ProducerRecord<String, Bytes> record =
                                                new ProducerRecord<>(
@@ -202,6 +203,7 @@ class ApplicationTests
                                                                Integer.toString(key%2),
                                                                value);
 
+                               record.headers().add("__TypeId__", "message".getBytes());
                                kafkaProducer.send(record, (metadata, e) ->
                                {
                                        if (metadata != null)
@@ -226,11 +228,19 @@ class ApplicationTests
                }
        }
 
+       Bytes serialize(Integer key, Long value)
+       {
+               ClientMessage message = new ClientMessage();
+               message.setClient(key.toString());
+               message.setMessage(value.toString());
+               return new Bytes(valueSerializer.serialize(TOPIC, message));
+       }
+
 
        @BeforeEach
        public void init()
        {
-               testHandler = record -> {} ;
+               recordHandler.testHandler = (record) -> {};
 
                oldOffsets = new HashMap<>();
                newOffsets = new HashMap<>();
@@ -242,24 +252,15 @@ class ApplicationTests
                        newOffsets.put(tp, offset - 1);
                });
 
-               Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
+               recordHandler.captureOffsets =
                                record ->
                                {
+                                       receivedRecords.add(record);
                                        newOffsets.put(
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
-                                       receivedRecords.add(record);
-                                       testHandler.accept(record);
                                };
 
-               endlessConsumer =
-                               new EndlessConsumer<>(
-                                               executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
-                                               kafkaConsumer,
-                                               captureOffsetAndExecuteTestHandler);
-
                endlessConsumer.start();
        }
 
@@ -276,22 +277,43 @@ class ApplicationTests
                }
        }
 
+       public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
+       {
+               Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
+               Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
+
+
+               @Override
+               public void accept(ConsumerRecord<String, ClientMessage> record)
+               {
+                       captureOffsets
+                                       .andThen(testHandler)
+                                       .accept(record);
+               }
+       }
 
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
        public static class Configuration
        {
+               @Primary
+               @Bean
+               public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
+               {
+                       return new RecordHandler();
+               }
+
                @Bean
-               Serializer<Long> serializer()
+               Serializer<ClientMessage> serializer()
                {
-                       return new LongSerializer();
+                       return new JsonSerializer<>();
                }
 
                @Bean
-               KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+               KafkaProducer<String, Bytes> kafkaProducer(KafkaProperties properties)
                {
                        Properties props = new Properties();
-                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
                        props.put("linger.ms", 100);
                        props.put("key.serializer", StringSerializer.class.getName());
                        props.put("value.serializer", BytesSerializer.class.getName());
@@ -300,12 +322,12 @@ class ApplicationTests
                }
 
                @Bean
-               KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+               KafkaConsumer<Bytes, Bytes> offsetConsumer(KafkaProperties properties)
                {
                        Properties props = new Properties();
-                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
                        props.put("client.id", "OFFSET-CONSUMER");
-                       props.put("group.id", properties.getGroupId());
+                       props.put("group.id", properties.getConsumer().getGroupId());
                        props.put("key.deserializer", BytesDeserializer.class.getName());
                        props.put("value.deserializer", BytesDeserializer.class.getName());