Verschachtelte Map gegen fachliche Datenstruktur ausgetauscht
authorKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 06:34:18 +0000 (08:34 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 6 Apr 2022 17:46:22 +0000 (19:46 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCounter.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/PartitionStatistics.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/TopicPartitionSerializer.java [new file with mode: 0644]

index dd4b20a..7cfe268 100644 (file)
@@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
 import org.springframework.util.Assert;
 
 import java.util.concurrent.Executors;
@@ -40,6 +41,16 @@ public class Application
     return consumer;
   }
 
+  @Bean
+  public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
+  {
+    return
+        new Jackson2ObjectMapperBuilder().serializers(
+            new TopicPartitionSerializer(),
+            new PartitionStatisticsSerializer());
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index a504842..ddff42e 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -30,7 +31,7 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<Integer, Map<String, Integer>> seen()
+  public Map<TopicPartition, PartitionStatistics> seen()
   {
     return consumer.getSeen();
   }
index be371ae..e3a60b5 100644 (file)
@@ -33,7 +33,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
-  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+  private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -77,15 +77,15 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            Map<String, Integer> removed = seen.remove(tp.partition());
-            for (String key : removed.keySet())
+            PartitionStatistics removed = seen.remove(tp);
+            for (KeyCounter counter : removed.getStatistics())
             {
               log.info(
                   "{} - Seen {} messages for partition={}|key={}",
                   id,
-                  removed.get(key),
-                  removed,
-                  key);
+                  counter.getCounter(),
+                  removed.getPartition(),
+                  counter.getKey());
             }
           });
         }
@@ -96,7 +96,7 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - adding partition: {}", id, tp);
-            seen.put(tp.partition(), new HashMap<>());
+            seen.put(tp, new PartitionStatistics(tp));
           });
         }
       });
@@ -121,16 +121,9 @@ public class EndlessConsumer implements Runnable
               record.value()
           );
 
-          Integer partition = record.partition();
+          TopicPartition partition = new TopicPartition(record.topic(), record.partition());
           String key = record.key() == null ? "NULL" : record.key();
-          Map<String, Integer> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0);
-
-          int seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
+          seen.get(partition).increment(key);
         }
       }
     }
@@ -151,7 +144,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<Integer, Map<String, Integer>> getSeen()
+  public Map<TopicPartition, PartitionStatistics> getSeen()
   {
     return seen;
   }
diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java
new file mode 100644 (file)
index 0000000..147dcd4
--- /dev/null
@@ -0,0 +1,24 @@
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode(of = { "key" })
+@ToString
+public class KeyCounter
+{
+  private final String key;
+
+  private long counter = 0;
+
+
+  public long increment()
+  {
+    return ++counter;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java
new file mode 100644 (file)
index 0000000..e47a9f9
--- /dev/null
@@ -0,0 +1,56 @@
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode(of = { "partition" })
+public class PartitionStatistics
+{
+  private final TopicPartition partition;
+  private final Map<String, KeyCounter> statistics = new HashMap<>();
+
+
+  public KeyCounter addKey(String key)
+  {
+    KeyCounter counter = new KeyCounter(key);
+
+    counter.increment();
+    statistics.put(key, counter);
+
+    return counter;
+  }
+
+
+  public long increment(String key)
+  {
+    KeyCounter counter = statistics.get(key);
+
+    if (counter == null)
+    {
+      counter = new KeyCounter(key);
+      statistics.put(key, counter);
+    }
+
+    return counter.increment();
+  }
+
+  public Collection<KeyCounter> getStatistics()
+  {
+    return statistics.values();
+  }
+
+  @Override
+  public String toString()
+  {
+    return partition.toString();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java
new file mode 100644 (file)
index 0000000..ed8230d
--- /dev/null
@@ -0,0 +1,43 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+
+
+@Slf4j
+public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
+{
+  @Override
+  public Class<PartitionStatistics> handledType()
+  {
+    return PartitionStatistics.class;
+  }
+
+  @Override
+  public void serialize(
+      PartitionStatistics statistics,
+      JsonGenerator jsonGenerator,
+      SerializerProvider serializerProvider) throws IOException
+  {
+    jsonGenerator.writeStartObject();
+    statistics
+        .getStatistics()
+        .forEach((counter) ->
+        {
+          try
+          {
+            jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
+          }
+          catch (NumberFormatException | IOException e)
+          {
+            log.error("Could not write {}: {}", counter, e.toString());
+          }
+        });
+    jsonGenerator.writeEndObject();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java
new file mode 100644 (file)
index 0000000..e7190a5
--- /dev/null
@@ -0,0 +1,27 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+
+
+public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
+{
+  @Override
+  public Class<TopicPartition> handledType()
+  {
+    return TopicPartition.class;
+  }
+
+  @Override
+  public void serialize(
+      TopicPartition topicPartition,
+      JsonGenerator jsonGenerator,
+      SerializerProvider serializerProvider) throws IOException
+  {
+    jsonGenerator.writeString(topicPartition.toString());
+  }
+}