Merge branch 'stored-state' into stored-offsets
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:19:01 +0000 (01:19 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:19:01 +0000 (01:19 +0200)
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCounter.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatistics.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java [deleted file]
src/main/java/de/juplo/kafka/StatisticsDocument.java
src/main/java/de/juplo/kafka/TopicPartitionSerializer.java [deleted file]
src/main/resources/application.yml

index c7f4e19..b84ed52 100644 (file)
@@ -32,6 +32,15 @@ services:
       MONGO_INITDB_ROOT_USERNAME: juplo
       MONGO_INITDB_ROOT_PASSWORD: training
 
+  express:
+    image: mongo-express
+    ports:
+      - 8090:8081
+    environment:
+      ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
+      ME_CONFIG_MONGODB_ADMINPASSWORD: training
+      ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+
   setup:
     image: juplo/toolbox
     command: >
index 23c845a..bcbf418 100644 (file)
@@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
 import org.springframework.util.Assert;
 
 import java.util.concurrent.Executors;
@@ -42,16 +41,6 @@ public class Application
     return consumer;
   }
 
-  @Bean
-  public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
-  {
-    return
-        new Jackson2ObjectMapperBuilder().serializers(
-            new TopicPartitionSerializer(),
-            new PartitionStatisticsSerializer());
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index ddff42e..a504842 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -31,7 +30,7 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<TopicPartition, PartitionStatistics> seen()
+  public Map<Integer, Map<String, Integer>> seen()
   {
     return consumer.getSeen();
   }
index b152310..2563204 100644 (file)
@@ -34,7 +34,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
-  private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -81,17 +81,17 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            PartitionStatistics removed = seen.remove(tp);
-            for (KeyCounter counter : removed.getStatistics())
+            Map<String, Integer> removed = seen.remove(tp.partition());
+            for (String key : removed.keySet())
             {
               log.info(
                   "{} - Seen {} messages for partition={}|key={}",
                   id,
-                  counter.getResult(),
-                  removed.getPartition(),
-                  counter.getKey());
+                  removed.get(key),
+                  tp.partition(),
+                  key);
             }
-            repository.save(new StatisticsDocument(removed, consumer.position(tp)));
+            repository.save(new StatisticsDocument(tp.partition(), removed, consumer.position(tp)));
           });
         }
 
@@ -103,10 +103,10 @@ public class EndlessConsumer implements Runnable
             log.info("{} - adding partition: {}", id, tp);
             StatisticsDocument document =
                 repository
-                    .findById(tp.toString())
-                    .orElse(new StatisticsDocument(tp));
+                    .findById(Integer.toString(tp.partition()))
+                    .orElse(new StatisticsDocument(tp.partition()));
             consumer.seek(tp, document.offset);
-            seen.put(tp, new PartitionStatistics(document));
+            seen.put(tp.partition(), document.statistics);
           });
         }
       });
@@ -131,12 +131,23 @@ public class EndlessConsumer implements Runnable
               record.value()
           );
 
-          TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+          Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          seen.get(partition).increment(key);
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
 
-        seen.forEach((tp, statistics) -> repository.save(new StatisticsDocument(statistics, consumer.position(tp))));
+        seen.forEach((partiton, statistics) -> repository.save(
+            new StatisticsDocument(
+                partiton,
+                statistics,
+                consumer.position(new TopicPartition(topic, partiton)))));
       }
     }
     catch(WakeupException e)
@@ -156,7 +167,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<TopicPartition, PartitionStatistics> getSeen()
+  public Map<Integer, Map<String, Integer>> getSeen()
   {
     return seen;
   }
diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java
deleted file mode 100644 (file)
index 1e3cbd2..0000000
+++ /dev/null
@@ -1,31 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "key" })
-@ToString
-public class KeyCounter
-{
-  private final String key;
-
-  private long result = 0;
-
-
-  public KeyCounter(String key, long initialValue)
-  {
-    this.key = key;
-    this.result = initialValue;
-  }
-
-
-  public long increment()
-  {
-    return ++result;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java
deleted file mode 100644 (file)
index 0e31945..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Getter
-@EqualsAndHashCode(of = { "partition" })
-public class PartitionStatistics
-{
-  private String id;
-  private final TopicPartition partition;
-  private final Map<String, KeyCounter> statistics = new HashMap<>();
-
-
-  public PartitionStatistics(TopicPartition partition)
-  {
-    this.partition = partition;
-  }
-
-  public PartitionStatistics(StatisticsDocument document)
-  {
-    this.partition = new TopicPartition(document.topic, document.partition);
-    document
-        .statistics
-        .entrySet()
-        .forEach(entry ->
-        {
-          this.statistics.put(
-              entry.getKey(),
-              new KeyCounter(entry.getKey(), entry.getValue()));
-        });
-  }
-
-
-  public KeyCounter addKey(String key)
-  {
-    KeyCounter counter = new KeyCounter(key);
-
-    counter.increment();
-    statistics.put(key, counter);
-
-    return counter;
-  }
-
-
-  public long increment(String key)
-  {
-    KeyCounter counter = statistics.get(key);
-
-    if (counter == null)
-    {
-      counter = new KeyCounter(key);
-      statistics.put(key, counter);
-    }
-
-    return counter.increment();
-  }
-
-  public Collection<KeyCounter> getStatistics()
-  {
-    return statistics.values();
-  }
-
-  @Override
-  public String toString()
-  {
-    return partition.toString();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java
deleted file mode 100644 (file)
index ed8230d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-@Slf4j
-public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
-{
-  @Override
-  public Class<PartitionStatistics> handledType()
-  {
-    return PartitionStatistics.class;
-  }
-
-  @Override
-  public void serialize(
-      PartitionStatistics statistics,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeStartObject();
-    statistics
-        .getStatistics()
-        .forEach((counter) ->
-        {
-          try
-          {
-            jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
-          }
-          catch (NumberFormatException | IOException e)
-          {
-            log.error("Could not write {}: {}", counter, e.toString());
-          }
-        });
-    jsonGenerator.writeEndObject();
-  }
-}
index e8c2e9b..96ebfb1 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.ToString;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
@@ -15,35 +14,23 @@ public class StatisticsDocument
 {
   @Id
   public String id;
-  public String topic;
-  public Integer partition;
   public long offset;
-  public Map<String, Long> statistics;
+  public Map<String, Integer> statistics;
 
   public StatisticsDocument()
   {
   }
 
-  public StatisticsDocument(TopicPartition tp)
+  public StatisticsDocument(Integer partition)
   {
-    this.topic = tp.topic();
-    this.partition = tp.partition();
-    this.offset = 0;
+    this.id = Integer.toString(partition);
+    this.statistics = new HashMap<>();
   }
 
-  public StatisticsDocument(String topic, Integer partition, Map<String, Long> statistics)
+  public StatisticsDocument(Integer partition, Map<String, Integer> statistics, long offset)
   {
-    this.partition = partition;
+    this.id = Integer.toString(partition);
     this.statistics = statistics;
-  }
-
-  public StatisticsDocument(PartitionStatistics statistics, long offset)
-  {
-    this.topic = statistics.getPartition().topic();
-    this.id = statistics.toString();
-    this.partition = statistics.getPartition().partition();
     this.offset = offset;
-    this.statistics = new HashMap<>();
-    statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult()));
   }
 }
diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java
deleted file mode 100644 (file)
index e7190a5..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
-{
-  @Override
-  public Class<TopicPartition> handledType()
-  {
-    return TopicPartition.class;
-  }
-
-  @Override
-  public void serialize(
-      TopicPartition topicPartition,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeString(topicPartition.toString());
-  }
-}
index 0e7c53c..94490a3 100644 (file)
@@ -1,7 +1,7 @@
 consumer:
   bootstrap-server: :9092
-  group-id: my-consumer
-  client-id: peter
+  group-id: my-group
+  client-id: IDE
   topic: test
   auto-offset-reset: earliest
 management: