Merge branch 'rebalance-listener' into stored-state
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:13:01 +0000 (01:13 +0200)
README.sh
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCounter.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatistics.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java [deleted file]
src/main/java/de/juplo/kafka/StatisticsDocument.java
src/main/java/de/juplo/kafka/TopicPartitionSerializer.java [deleted file]

index c14f45b..13176d2 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -47,6 +47,7 @@ http -v :8081/seen
 sleep 1
 http -v :8081/seen
 
+docker-compose stop producer
 docker-compose exec -T cli bash << 'EOF'
 echo "Altering number of partitions from 3 to 7..."
 # tag::altertopic[]
@@ -55,7 +56,7 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 # end::altertopic[]
 EOF
 
-docker-compose restart producer
+docker-compose start producer
 sleep 1
 http -v :8081/seen
 sleep 1
index 23c845a..bcbf418 100644 (file)
@@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
 import org.springframework.util.Assert;
 
 import java.util.concurrent.Executors;
@@ -42,16 +41,6 @@ public class Application
     return consumer;
   }
 
-  @Bean
-  public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
-  {
-    return
-        new Jackson2ObjectMapperBuilder().serializers(
-            new TopicPartitionSerializer(),
-            new PartitionStatisticsSerializer());
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index ddff42e..a504842 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -31,7 +30,7 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<TopicPartition, PartitionStatistics> seen()
+  public Map<Integer, Map<String, Integer>> seen()
   {
     return consumer.getSeen();
   }
index e67bf41..7cb77aa 100644 (file)
@@ -34,7 +34,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
-  private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -80,17 +80,17 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            PartitionStatistics removed = seen.remove(tp);
-            for (KeyCounter counter : removed.getStatistics())
+            Map<String, Integer> removed = seen.remove(tp.partition());
+            for (String key : removed.keySet())
             {
               log.info(
                   "{} - Seen {} messages for partition={}|key={}",
                   id,
-                  counter.getResult(),
-                  removed.getPartition(),
-                  counter.getKey());
+                  removed.get(key),
+                  tp.partition(),
+                  key);
             }
-            repository.save(new StatisticsDocument(removed));
+            repository.save(new StatisticsDocument(tp.partition(), removed));
           });
         }
 
@@ -101,11 +101,11 @@ public class EndlessConsumer implements Runnable
           {
             log.info("{} - adding partition: {}", id, tp);
             seen.put(
-                tp,
+                tp.partition(),
                 repository
-                    .findById(tp.toString())
-                    .map(PartitionStatistics::new)
-                    .orElse(new PartitionStatistics(tp)));
+                    .findById(Integer.toString(tp.partition()))
+                    .map(document -> document.statistics)
+                    .orElse(new HashMap<>()));
           });
         }
       });
@@ -130,9 +130,16 @@ public class EndlessConsumer implements Runnable
               record.value()
           );
 
-          TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+          Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          seen.get(partition).increment(key);
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
       }
     }
@@ -153,7 +160,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<TopicPartition, PartitionStatistics> getSeen()
+  public Map<Integer, Map<String, Integer>> getSeen()
   {
     return seen;
   }
diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java
deleted file mode 100644 (file)
index 1e3cbd2..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "key" })
-@ToString
-public class KeyCounter
-{
-  private final String key;
-
-  private long result = 0;
-
-
-  public KeyCounter(String key, long initialValue)
-  {
-    this.key = key;
-    this.result = initialValue;
-  }
-
-
-  public long increment()
-  {
-    return ++result;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java
deleted file mode 100644 (file)
index 0e31945..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Getter
-@EqualsAndHashCode(of = { "partition" })
-public class PartitionStatistics
-{
-  private String id;
-  private final TopicPartition partition;
-  private final Map<String, KeyCounter> statistics = new HashMap<>();
-
-
-  public PartitionStatistics(TopicPartition partition)
-  {
-    this.partition = partition;
-  }
-
-  public PartitionStatistics(StatisticsDocument document)
-  {
-    this.partition = new TopicPartition(document.topic, document.partition);
-    document
-        .statistics
-        .entrySet()
-        .forEach(entry ->
-        {
-          this.statistics.put(
-              entry.getKey(),
-              new KeyCounter(entry.getKey(), entry.getValue()));
-        });
-  }
-
-
-  public KeyCounter addKey(String key)
-  {
-    KeyCounter counter = new KeyCounter(key);
-
-    counter.increment();
-    statistics.put(key, counter);
-
-    return counter;
-  }
-
-
-  public long increment(String key)
-  {
-    KeyCounter counter = statistics.get(key);
-
-    if (counter == null)
-    {
-      counter = new KeyCounter(key);
-      statistics.put(key, counter);
-    }
-
-    return counter.increment();
-  }
-
-  public Collection<KeyCounter> getStatistics()
-  {
-    return statistics.values();
-  }
-
-  @Override
-  public String toString()
-  {
-    return partition.toString();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java
deleted file mode 100644 (file)
index ed8230d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-@Slf4j
-public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
-{
-  @Override
-  public Class<PartitionStatistics> handledType()
-  {
-    return PartitionStatistics.class;
-  }
-
-  @Override
-  public void serialize(
-      PartitionStatistics statistics,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeStartObject();
-    statistics
-        .getStatistics()
-        .forEach((counter) ->
-        {
-          try
-          {
-            jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
-          }
-          catch (NumberFormatException | IOException e)
-          {
-            log.error("Could not write {}: {}", counter, e.toString());
-          }
-        });
-    jsonGenerator.writeEndObject();
-  }
-}
index 9318c4c..be998ca 100644 (file)
@@ -14,26 +14,15 @@ public class StatisticsDocument
 {
   @Id
   public String id;
-  public String topic;
-  public Integer partition;
-  public Map<String, Long> statistics;
+  public Map<String, Integer> statistics;
 
   public StatisticsDocument()
   {
   }
 
-  public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
+  public StatisticsDocument(Integer partition, Map<String, Integer> statistics)
   {
-    this.partition = partition;
+    this.id = Integer.toString(partition);
     this.statistics = statistics;
   }
-
-  public StatisticsDocument(PartitionStatistics statistics)
-  {
-    this.topic = statistics.getPartition().topic();
-    this.id = statistics.toString();
-    this.partition = statistics.getPartition().partition();
-    this.statistics = new HashMap<>();
-    statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
-  }
 }
diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java
deleted file mode 100644 (file)
index e7190a5..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
-{
-  @Override
-  public Class<TopicPartition> handledType()
-  {
-    return TopicPartition.class;
-  }
-
-  @Override
-  public void serialize(
-      TopicPartition topicPartition,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeString(topicPartition.toString());
-  }
-}