Fixes für Setup/README.sh aus 'deserialization' in 'stored-offsets' gemerged
authorKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 21:07:17 +0000 (23:07 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 07:22:05 +0000 (09:22 +0200)
16 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StatisticsDocument.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/TestRecordHandler.java [new file with mode: 0644]

index fe237dc..b7bce99 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,7 +9,7 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka cli mongo express
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -25,22 +25,16 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d producer
-docker-compose up consumer &
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
-sleep 5
-docker-compose exec -T cli bash << 'EOF'
-echo "Writing poison pill into topic test..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
-# end::poisonpill[]
-EOF
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
-http -v :8081/actuator/health
-echo "Restarting consumer"
-http -v post :8081/start
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
-http -v :8081/actuator/health
-http -v post :8081/actuator/shutdown
-docker-compose stop producer
+docker-compose up -d producer peter beate
+
+sleep 15
+
+http -v post :8082/stop
+sleep 10
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
+
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
index 81b98ac..7ab77b2 100644 (file)
@@ -24,6 +24,25 @@ services:
     depends_on:
       - zookeeper
 
+  mongo:
+    image: mongo:4.4.13
+    ports:
+      - 27017:27017
+    environment:
+      MONGO_INITDB_ROOT_USERNAME: juplo
+      MONGO_INITDB_ROOT_PASSWORD: training
+
+  express:
+    image: mongo-express
+    ports:
+      - 8090:8081
+    environment:
+      ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
+      ME_CONFIG_MONGODB_ADMINPASSWORD: training
+      ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+    depends_on:
+      - mongo
+
   setup:
     image: juplo/toolbox
     command: >
@@ -45,16 +64,29 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 200
+      producer.throttle-ms: 500
 
 
-  consumer:
+  peter:
     image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
       server.port: 8080
       consumer.bootstrap-server: kafka:9092
-      consumer.client-id: my-group
-      consumer.client-id: consumer
+      consumer.client-id: peter
+      consumer.topic: test
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
+
+  beate:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8082:8080
+    environment:
+      server.port: 8080
+      consumer.bootstrap-server: kafka:9092
+      consumer.client-id: beate
       consumer.topic: test
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
diff --git a/pom.xml b/pom.xml
index 1f5caab..701704d 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-mongodb</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-validation</artifactId>
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>de.flapdoodle.embed</groupId>
+      <artifactId>de.flapdoodle.embed.mongo</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index b5bd1b9..76c2520 100644 (file)
@@ -8,7 +8,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 import javax.annotation.PreDestroy;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -31,8 +30,22 @@ public class Application implements ApplicationRunner
   }
 
   @PreDestroy
-  public void stopExecutor()
+  public void shutdown()
   {
+    try
+    {
+      log.info("Stopping EndlessConsumer");
+      endlessConsumer.stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("Was already stopped: {}", e.toString());
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+    }
+
     try
     {
       log.info("Shutting down the ExecutorService.");
@@ -42,7 +55,7 @@ public class Application implements ApplicationRunner
     }
     catch (InterruptedException e)
     {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
     }
     finally
     {
index 766740b..3925fcb 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -8,10 +8,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Clock;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.Consumer;
 
 
 @Configuration
@@ -19,19 +19,34 @@ import java.util.function.Consumer;
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, Long>> consumer()
+  public KeyCountingRecordHandler messageCountingRecordHandler()
   {
-    return (record) ->
-    {
-      // Handle record
-    };
+    return new KeyCountingRecordHandler();
+  }
+
+  @Bean
+  public KeyCountingRebalanceListener wordcountRebalanceListener(
+      KeyCountingRecordHandler keyCountingRecordHandler,
+      PartitionStatisticsRepository repository,
+      Consumer<String, Long> consumer,
+      ApplicationProperties properties)
+  {
+    return new KeyCountingRebalanceListener(
+        keyCountingRecordHandler,
+        repository,
+        properties.getClientId(),
+        properties.getTopic(),
+        Clock.systemDefaultZone(),
+        properties.getCommitInterval(),
+        consumer);
   }
 
   @Bean
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      Consumer<ConsumerRecord<String, Long>> handler,
+      KeyCountingRebalanceListener keyCountingRebalanceListener,
+      KeyCountingRecordHandler keyCountingRecordHandler,
       ApplicationProperties properties)
   {
     return
@@ -40,7 +55,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            handler);
+            keyCountingRebalanceListener,
+            keyCountingRecordHandler);
   }
 
   @Bean
@@ -55,8 +71,10 @@ public class ApplicationConfiguration
     Properties props = new Properties();
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
+    props.put("enable.auto.commit", false);
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
     props.put("metadata.max.age.ms", "1000");
index ed38080..f6ff47f 100644 (file)
@@ -2,11 +2,7 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -17,6 +13,7 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
+  private final KeyCountingRecordHandler keyCountingRecordHandler;
 
 
   @PostMapping("start")
@@ -35,7 +32,7 @@ public class DriverController
   @GetMapping("seen")
   public Map<Integer, Map<String, Long>> seen()
   {
-    return consumer.getSeen();
+    return keyCountingRecordHandler.getSeen();
   }
 
 
index 8802df9..58557f2 100644 (file)
@@ -19,13 +19,14 @@ import java.util.concurrent.locks.ReentrantLock;
 
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements Runnable
 {
   private final ExecutorService executor;
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
+  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
+  private final RecordHandler<K, V> handler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -33,50 +34,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private Exception exception;
   private long consumed = 0;
 
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
-
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long newOffset = consumer.position(tp);
-      Long oldOffset = offsets.remove(partition);
-      log.info(
-          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
-          id,
-          partition,
-          newOffset - oldOffset,
-          oldOffset,
-          newOffset);
-      Map<String, Long> removed = seen.remove(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      offsets.put(partition, offset);
-      seen.put(partition, new HashMap<>());
-    });
-  }
 
 
   @Override
@@ -85,7 +42,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
+      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
 
       while (true)
       {
@@ -109,24 +66,14 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           handler.accept(record);
 
           consumed++;
-
-          Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
         }
+
+        pollIntervalAwareRebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -140,7 +87,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)
@@ -190,11 +136,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-
   public void start()
   {
     lock.lock();
@@ -214,7 +155,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public synchronized void stop() throws ExecutionException, InterruptedException
+  public synchronized void stop() throws InterruptedException
   {
     lock.lock();
     try
@@ -237,22 +178,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 
   public boolean running()
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
new file mode 100644 (file)
index 0000000..4a2c036
--- /dev/null
@@ -0,0 +1,83 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+  private final KeyCountingRecordHandler handler;
+  private final PartitionStatisticsRepository repository;
+  private final String id;
+  private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
+  private final Consumer<String, Long> consumer;
+
+  private Instant lastCommit = Instant.EPOCH;
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      StatisticsDocument document =
+          repository
+              .findById(Integer.toString(partition))
+              .orElse(new StatisticsDocument(partition));
+      if (document.offset >= 0)
+      {
+        // Only seek, if a stored offset was found
+        // Otherwise: Use initial offset, generated by Kafka
+        consumer.seek(tp, document.offset);
+      }
+      handler.addPartition(partition, document.statistics);
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      log.info(
+          "{} - removing partition: {}, offset of next message {})",
+          id,
+          partition,
+          newOffset);
+      Map<String, Long> removed = handler.removePartition(partition);
+      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+    });
+  }
+
+
+  @Override
+  public void beforeNextPoll()
+  {
+    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+    {
+      log.debug("Storing data and offsets, last commit: {}", lastCommit);
+      handler.getSeen().forEach((partiton, statistics) -> repository.save(
+          new StatisticsDocument(
+              partiton,
+              statistics,
+              consumer.position(new TopicPartition(topic, partiton)))));
+      lastCommit = clock.instant();
+    }
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
new file mode 100644 (file)
index 0000000..099dcf7
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class KeyCountingRecordHandler implements RecordHandler<String, Long>
+{
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, Long> record)
+  {
+    Integer partition = record.partition();
+    String key = record.key() == null ? "NULL" : record.key().toString();
+    Map<String, Long> byKey = seen.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
+  }
+
+  public void addPartition(Integer partition, Map<String, Long> statistics)
+  {
+    seen.put(partition, statistics);
+  }
+
+  public Map<String, Long> removePartition(Integer partition)
+  {
+    return seen.remove(partition);
+  }
+
+
+  public Map<Integer, Map<String, Long>> getSeen()
+  {
+    return seen;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
new file mode 100644 (file)
index 0000000..0ccf3cd
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
+{
+  public Optional<StatisticsDocument> findById(String partition);
+}
diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java
new file mode 100644 (file)
index 0000000..8abec12
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+  default void beforeNextPoll() {}
+}
diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java
new file mode 100644 (file)
index 0000000..3c9dd15
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.function.Consumer;
+
+
+public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
+{
+  default void beforeNextPoll() {}
+}
diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java
new file mode 100644 (file)
index 0000000..1244f45
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Document(collection = "statistics")
+@ToString
+public class StatisticsDocument
+{
+  @Id
+  public String id;
+  public long offset = -1l;
+  public Map<String, Long> statistics;
+
+  public StatisticsDocument()
+  {
+  }
+
+  public StatisticsDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.statistics = new HashMap<>();
+  }
+
+  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
+  {
+    this.id = Integer.toString(partition);
+    this.statistics = statistics;
+    this.offset = offset;
+  }
+}
index f8bfe7e..fc1c68a 100644 (file)
@@ -25,6 +25,11 @@ info:
     group-id: ${consumer.group-id}
     topic: ${consumer.topic}
     auto-offset-reset: ${consumer.auto-offset-reset}
+spring:
+  data:
+    mongodb:
+      uri: mongodb://juplo:training@localhost:27017
+      database: juplo
 logging:
   level:
     root: INFO
index 3bac537..fc5d4c9 100644 (file)
@@ -11,6 +11,8 @@ import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
@@ -24,7 +26,6 @@ import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -40,8 +41,11 @@ import static org.awaitility.Awaitility.*;
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC,
-                               "consumer.commit-interval=1s" })
+                               "consumer.commit-interval=1s",
+                               "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
 @Slf4j
 class ApplicationTests
 {
@@ -60,11 +64,18 @@ class ApplicationTests
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
+       PartitionStatisticsRepository partitionStatisticsRepository;
+       @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
+       @Autowired
+       PartitionStatisticsRepository repository;
+       @Autowired
+       KeyCountingRebalanceListener keyCountingRebalanceListener;
+       @Autowired
+       KeyCountingRecordHandler keyCountingRecordHandler;
 
-       Consumer<ConsumerRecord<String, Long>> testHandler;
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
@@ -180,24 +191,30 @@ class ApplicationTests
        void seekToEnd()
        {
                offsetConsumer.assign(partitions());
-               offsetConsumer.seekToEnd(partitions());
                partitions().forEach(tp ->
                {
-                       // seekToEnd() works lazily: it only takes effect on poll()/position()
                        Long offset = offsetConsumer.position(tp);
                        log.info("New position for {}: {}", tp, offset);
+                       Integer partition = tp.partition();
+                       StatisticsDocument document =
+                                       partitionStatisticsRepository
+                                                       .findById(partition.toString())
+                                                       .orElse(new StatisticsDocument(partition));
+                       document.offset = offset;
+                       partitionStatisticsRepository.save(document);
                });
-               // The new positions must be commited!
-               offsetConsumer.commitSync();
                offsetConsumer.unsubscribe();
        }
 
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
-               offsetConsumer.assign(partitions());
-               partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-               offsetConsumer.unsubscribe();
-       }
+               partitions().forEach(tp ->
+               {
+                       String partition = Integer.toString(tp.partition());
+                       Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+                       consumer.accept(tp, offset.orElse(0l));
+               });
+               }
 
        List<TopicPartition> partitions()
        {
@@ -253,8 +270,6 @@ class ApplicationTests
        @BeforeEach
        public void init()
        {
-               testHandler = record -> {} ;
-
                seekToEnd();
 
                oldOffsets = new HashMap<>();
@@ -267,14 +282,16 @@ class ApplicationTests
                        newOffsets.put(tp, offset - 1);
                });
 
-               Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
-                               record ->
-                               {
-                                       newOffsets.put(
-                                                       new TopicPartition(record.topic(), record.partition()),
-                                                       record.offset());
-                                       receivedRecords.add(record);
-                                       testHandler.accept(record);
+               TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
+                               new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+                                       @Override
+                                       public void onNewRecord(ConsumerRecord<String, Long> record)
+                                       {
+                                               newOffsets.put(
+                                                               new TopicPartition(record.topic(), record.partition()),
+                                                               record.offset());
+                                               receivedRecords.add(record);
+                                       }
                                };
 
                endlessConsumer =
@@ -283,6 +300,7 @@ class ApplicationTests
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
+                                               keyCountingRebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
 
                endlessConsumer.start();
@@ -330,7 +348,8 @@ class ApplicationTests
                        Properties props = new Properties();
                        props.put("bootstrap.servers", properties.getBootstrapServer());
                        props.put("client.id", "OFFSET-CONSUMER");
-                       props.put("group.id", properties.getGroupId());
+                       props.put("enable.auto.commit", false);
+                       props.put("auto.offset.reset", "latest");
                        props.put("key.deserializer", BytesDeserializer.class.getName());
                        props.put("value.deserializer", BytesDeserializer.class.getName());
 
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
new file mode 100644 (file)
index 0000000..de28385
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+
+@RequiredArgsConstructor
+public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
+{
+  private final RecordHandler<K, V> handler;
+
+
+  public abstract void onNewRecord(ConsumerRecord<K, V> record);
+
+
+  @Override
+  public void accept(ConsumerRecord<K, V> record)
+  {
+    this.onNewRecord(record);
+    handler.accept(record);
+  }
+  @Override
+
+  public void beforeNextPoll()
+  {
+    handler.beforeNextPoll();
+  }
+}