Rückbau auf verschachtelte Maps
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 07:25:41 +0000 (09:25 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 22:48:59 +0000 (00:48 +0200)
* Die zuvor erfundenen fachlichen Klassen passen nicht zu dazu, dass man
  sie - wie gedacht - direkt in MongoDB stopfen könnte.
* Daher hier erst mal der Rückbau auf Maps, da das dan für die Übungen
  einfacher ist.

README.sh
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCounter.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatistics.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java [deleted file]
src/main/java/de/juplo/kafka/TopicPartitionSerializer.java [deleted file]

index c14f45b..13176d2 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -47,6 +47,7 @@ http -v :8081/seen
 sleep 1
 http -v :8081/seen
 
+docker-compose stop producer
 docker-compose exec -T cli bash << 'EOF'
 echo "Altering number of partitions from 3 to 7..."
 # tag::altertopic[]
@@ -55,7 +56,7 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 # end::altertopic[]
 EOF
 
-docker-compose restart producer
+docker-compose start producer
 sleep 1
 http -v :8081/seen
 sleep 1
index 7cfe268..dd4b20a 100644 (file)
@@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
-import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
 import org.springframework.util.Assert;
 
 import java.util.concurrent.Executors;
@@ -41,16 +40,6 @@ public class Application
     return consumer;
   }
 
-  @Bean
-  public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder()
-  {
-    return
-        new Jackson2ObjectMapperBuilder().serializers(
-            new TopicPartitionSerializer(),
-            new PartitionStatisticsSerializer());
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index ddff42e..a504842 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -31,7 +30,7 @@ public class DriverController
 
 
   @GetMapping("seen")
-  public Map<TopicPartition, PartitionStatistics> seen()
+  public Map<Integer, Map<String, Integer>> seen()
   {
     return consumer.getSeen();
   }
index c7bc852..14a875b 100644 (file)
@@ -33,7 +33,7 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
   private Future<?> future = null;
 
-  private final Map<TopicPartition, PartitionStatistics> seen = new HashMap<>();
+  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 
 
   public EndlessConsumer(
@@ -77,15 +77,15 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - removing partition: {}", id, tp);
-            PartitionStatistics removed = seen.remove(tp);
-            for (KeyCounter counter : removed.getStatistics())
+            Map<String, Integer> removed = seen.remove(tp.partition());
+            for (String key : removed.keySet())
             {
               log.info(
                   "{} - Seen {} messages for partition={}|key={}",
                   id,
-                  counter.getResult(),
-                  removed.getPartition(),
-                  counter.getKey());
+                  removed.get(key),
+                  tp.partition(),
+                  key);
             }
           });
         }
@@ -96,7 +96,7 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - adding partition: {}", id, tp);
-            seen.put(tp, new PartitionStatistics(tp));
+            seen.put(tp.partition(), new HashMap<>());
           });
         }
       });
@@ -121,9 +121,16 @@ public class EndlessConsumer implements Runnable
               record.value()
           );
 
-          TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+          Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
-          seen.get(partition).increment(key);
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
       }
     }
@@ -144,7 +151,7 @@ public class EndlessConsumer implements Runnable
     }
   }
 
-  public Map<TopicPartition, PartitionStatistics> getSeen()
+  public Map<Integer, Map<String, Integer>> getSeen()
   {
     return seen;
   }
diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java
deleted file mode 100644 (file)
index b2cde47..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "key" })
-@ToString
-public class KeyCounter
-{
-  private final String key;
-
-  private long result = 0;
-
-
-  public long increment()
-  {
-    return ++result;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java
deleted file mode 100644 (file)
index e47a9f9..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode(of = { "partition" })
-public class PartitionStatistics
-{
-  private final TopicPartition partition;
-  private final Map<String, KeyCounter> statistics = new HashMap<>();
-
-
-  public KeyCounter addKey(String key)
-  {
-    KeyCounter counter = new KeyCounter(key);
-
-    counter.increment();
-    statistics.put(key, counter);
-
-    return counter;
-  }
-
-
-  public long increment(String key)
-  {
-    KeyCounter counter = statistics.get(key);
-
-    if (counter == null)
-    {
-      counter = new KeyCounter(key);
-      statistics.put(key, counter);
-    }
-
-    return counter.increment();
-  }
-
-  public Collection<KeyCounter> getStatistics()
-  {
-    return statistics.values();
-  }
-
-  @Override
-  public String toString()
-  {
-    return partition.toString();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java
deleted file mode 100644 (file)
index ed8230d..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-@Slf4j
-public class PartitionStatisticsSerializer extends JsonSerializer<PartitionStatistics>
-{
-  @Override
-  public Class<PartitionStatistics> handledType()
-  {
-    return PartitionStatistics.class;
-  }
-
-  @Override
-  public void serialize(
-      PartitionStatistics statistics,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeStartObject();
-    statistics
-        .getStatistics()
-        .forEach((counter) ->
-        {
-          try
-          {
-            jsonGenerator.writeNumberField(counter.getKey(), counter.getResult());
-          }
-          catch (NumberFormatException | IOException e)
-          {
-            log.error("Could not write {}: {}", counter, e.toString());
-          }
-        });
-    jsonGenerator.writeEndObject();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java
deleted file mode 100644 (file)
index e7190a5..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-
-
-public class TopicPartitionSerializer extends JsonSerializer<TopicPartition>
-{
-  @Override
-  public Class<TopicPartition> handledType()
-  {
-    return TopicPartition.class;
-  }
-
-  @Override
-  public void serialize(
-      TopicPartition topicPartition,
-      JsonGenerator jsonGenerator,
-      SerializerProvider serializerProvider) throws IOException
-  {
-    jsonGenerator.writeString(topicPartition.toString());
-  }
-}