Merge branch 'rebalance-listener' into stored-state
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
1  2 
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

@@@ -5,7 -5,6 +5,6 @@@ import org.springframework.boot.SpringA
  import org.springframework.boot.autoconfigure.SpringBootApplication;
  import org.springframework.boot.context.properties.EnableConfigurationProperties;
  import org.springframework.context.annotation.Bean;
- import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
  import org.springframework.util.Assert;
  
  import java.util.concurrent.Executors;
@@@ -20,7 -19,7 +19,7 @@@ public class Applicatio
  
  
    @Bean
 -  public EndlessConsumer consumer()
 +  public EndlessConsumer consumer(PartitionStatisticsRepository repository)
    {
      Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set");
      Assert.hasText(properties.getGroupId(), "consumer.group-id must be set");
@@@ -30,7 -29,6 +29,7 @@@
      EndlessConsumer consumer =
          new EndlessConsumer(
              Executors.newFixedThreadPool(1),
 +            repository,
              properties.getBootstrapServer(),
              properties.getGroupId(),
              properties.getClientId(),
      return consumer;
    }
  
-   @Bean
-   public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
-   {
-     return
-         new Jackson2ObjectMapperBuilder().serializers(
-             new TopicPartitionSerializer(),
-             new PartitionStatisticsSerializer());
-   }
    public static void main(String[] args)
    {
      SpringApplication.run(Application.class, args);
@@@ -22,7 -22,6 +22,7 @@@ import java.util.concurrent.atomic.Atom
  public class EndlessConsumer implements Runnable
  {
    private final ExecutorService executor;
 +  private final PartitionStatisticsRepository repository;
    private final String bootstrapServer;
    private final String groupId;
    private final String id;
    private KafkaConsumer<String, String> consumer = null;
    private Future<?> future = null;
  
-   private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+   private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
  
  
    public EndlessConsumer(
        ExecutorService executor,
 +      PartitionStatisticsRepository repository,
        String bootstrapServer,
        String groupId,
        String clientId,
@@@ -47,7 -45,6 +47,7 @@@
        String autoOffsetReset)
    {
      this.executor = executor;
 +    this.repository = repository;
      this.bootstrapServer = bootstrapServer;
      this.groupId = groupId;
      this.id = clientId;
            partitions.forEach(tp ->
            {
              log.info("{} - removing partition: {}", id, tp);
-             PartitionStatistics removed = seen.remove(tp);
-             for (KeyCounter counter : removed.getStatistics())
+             Map<String, Integer> removed = seen.remove(tp.partition());
+             for (String key : removed.keySet())
              {
                log.info(
                    "{} - Seen {} messages for partition={}|key={}",
                    id,
-                   counter.getResult(),
-                   removed.getPartition(),
-                   counter.getKey());
+                   removed.get(key),
+                   tp.partition(),
+                   key);
              }
-             repository.save(new StatisticsDocument(removed));
++            repository.save(new StatisticsDocument(tp.partition(), removed));
            });
          }
  
            partitions.forEach(tp ->
            {
              log.info("{} - adding partition: {}", id, tp);
 -            seen.put(tp.partition(), new HashMap<>());
 +            seen.put(
-                 tp,
++                tp.partition(),
 +                repository
-                     .findById(tp.toString())
-                     .map(PartitionStatistics::new)
-                     .orElse(new PartitionStatistics(tp)));
++                    .findById(Integer.toString(tp.partition()))
++                    .map(document -> document.statistics)
++                    .orElse(new HashMap<>()));
            });
          }
        });
                record.value()
            );
  
-           TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+           Integer partition = record.partition();
            String key = record.key() == null ? "NULL" : record.key();
-           seen.get(partition).increment(key);
+           Map<String, Integer> byKey = seen.get(partition);
+           if (!byKey.containsKey(key))
+             byKey.put(key, 0);
+           int seenByKey = byKey.get(key);
+           seenByKey++;
+           byKey.put(key, seenByKey);
          }
        }
      }
      }
    }
  
-   public Map<TopicPartition, PartitionStatistics> getSeen()
+   public Map<Integer, Map<String, Integer>> getSeen()
    {
      return seen;
    }
index 9318c4c,0000000..be998ca
mode 100644,000000..100644
--- /dev/null
@@@ -1,39 -1,0 +1,28 @@@
-   public String topic;
-   public Integer partition;
-   public Map<String, Long> statistics;
 +package de.juplo.kafka;
 +
 +import lombok.ToString;
 +import org.springframework.data.annotation.Id;
 +import org.springframework.data.mongodb.core.mapping.Document;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +
 +@Document(collection = "statistics")
 +@ToString
 +public class StatisticsDocument
 +{
 +  @Id
 +  public String id;
-   public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
++  public Map<String, Integer> statistics;
 +
 +  public StatisticsDocument()
 +  {
 +  }
 +
-     this.partition = partition;
++  public StatisticsDocument(Integer partition, Map<String, Integer> statistics)
 +  {
-   public StatisticsDocument(PartitionStatistics statistics)
-   {
-     this.topic = statistics.getPartition().topic();
-     this.id = statistics.toString();
-     this.partition = statistics.getPartition().partition();
-     this.statistics = new HashMap<>();
-     statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
-   }
++    this.id = Integer.toString(partition);
 +    this.statistics = statistics;
 +  }
 +}