Merge der Refaktorisierung des EndlessConsumer (Branch 'stored-state')
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 13:35:14 +0000 (15:35 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 14:10:19 +0000 (16:10 +0200)
1  2 
docker-compose.yml
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

Simple merge
@@@ -35,30 -33,59 +33,58 @@@ public class EndlessConsumer<K, V> impl
    private boolean running = false;
    private Exception exception;
    private long consumed = 0;
-   private KafkaConsumer<String, String> consumer = null;
  
+   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+   private final Map<Integer, Long> offsets = new HashMap<>();
  
-   private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
  
 -      repository.save(new StatisticsDocument(partition, removed));
+   @Override
+   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+   {
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
+       Long newOffset = consumer.position(tp);
+       Long oldOffset = offsets.remove(partition);
+       log.info(
+           "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+           id,
+           partition,
+           newOffset - oldOffset,
+           oldOffset,
+           newOffset);
+       Map<String, Long> removed = seen.remove(partition);
+       for (String key : removed.keySet())
+       {
+         log.info(
+             "{} - Seen {} messages for partition={}|key={}",
+             id,
+             removed.get(key),
+             partition,
+             key);
+       }
++      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+     });
+   }
  
-   public EndlessConsumer(
-       ExecutorService executor,
-       PartitionStatisticsRepository repository,
-       String bootstrapServer,
-       String groupId,
-       String clientId,
-       String topic,
-       String autoOffsetReset)
+   @Override
+   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
    {
-     this.executor = executor;
-     this.repository = repository;
-     this.bootstrapServer = bootstrapServer;
-     this.groupId = groupId;
-     this.id = clientId;
-     this.topic = topic;
-     this.autoOffsetReset = autoOffsetReset;
+     partitions.forEach(tp ->
+     {
+       Integer partition = tp.partition();
+       Long offset = consumer.position(tp);
+       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
 -      offsets.put(partition, offset);
 -      seen.put(
 -          partition,
++      StatisticsDocument document =
+           repository
 -              .findById(Integer.toString(tp.partition()))
 -              .map(document -> document.statistics)
 -              .orElse(new HashMap<>()));
++              .findById(Integer.toString(partition))
++              .orElse(new StatisticsDocument(partition));
++      consumer.seek(tp, document.offset);
++      seen.put(partition, document.statistics);
+     });
    }
  
    @Override
    public void run()
    {
@@@ -14,20 -14,13 +14,20 @@@ public class StatisticsDocumen
  {
    @Id
    public String id;
-   public Map<String, Integer> statistics;
 +  public long offset;
+   public Map<String, Long> statistics;
  
    public StatisticsDocument()
    {
    }
  
 -  public StatisticsDocument(Integer partition, Map<String, Long> statistics)
 +  public StatisticsDocument(Integer partition)
 +  {
 +    this.id = Integer.toString(partition);
 +    this.statistics = new HashMap<>();
 +  }
 +
-   public StatisticsDocument(Integer partition, Map<String, Integer> statistics, long offset)
++  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
    {
      this.id = Integer.toString(partition);
      this.statistics = statistics;