From 32a052b7c494009c59190857984ef3563f4f2b14 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 6 Apr 2022 08:34:18 +0200 Subject: [PATCH] Verschachtelte Map gegen fachliche Datenstruktur ausgetauscht --- src/main/java/de/juplo/kafka/Application.java | 11 ++++ .../java/de/juplo/kafka/DriverController.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 27 ++++----- src/main/java/de/juplo/kafka/KeyCounter.java | 24 ++++++++ .../de/juplo/kafka/PartitionStatistics.java | 56 +++++++++++++++++++ .../kafka/PartitionStatisticsSerializer.java | 43 ++++++++++++++ .../juplo/kafka/TopicPartitionSerializer.java | 27 +++++++++ 7 files changed, 173 insertions(+), 18 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/KeyCounter.java create mode 100644 src/main/java/de/juplo/kafka/PartitionStatistics.java create mode 100644 src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java create mode 100644 src/main/java/de/juplo/kafka/TopicPartitionSerializer.java diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index dd4b20a..7cfe268 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.util.Assert; import java.util.concurrent.Executors; @@ -40,6 +41,16 @@ public class Application return consumer; } + @Bean + public Jackson2ObjectMapperBuilder jackson2ObjectMapperBuilder() + { + return + new Jackson2ObjectMapperBuilder().serializers( + new TopicPartitionSerializer(), + new PartitionStatisticsSerializer()); + } + + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index a504842..ddff42e 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @@ -30,7 +31,7 @@ public class DriverController @GetMapping("seen") - public Map> seen() + public Map seen() { return consumer.getSeen(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index be371ae..e3a60b5 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -33,7 +33,7 @@ public class EndlessConsumer implements Runnable private KafkaConsumer consumer = null; private Future future = null; - private final Map> seen = new HashMap<>(); + private final Map seen = new HashMap<>(); public EndlessConsumer( @@ -77,15 +77,15 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - removing partition: {}", id, tp); - Map removed = seen.remove(tp.partition()); - for (String key : removed.keySet()) + PartitionStatistics removed = seen.remove(tp); + for (KeyCounter counter : removed.getStatistics()) { log.info( "{} - Seen {} messages for partition={}|key={}", id, - removed.get(key), - removed, - key); + counter.getCounter(), + removed.getPartition(), + counter.getKey()); } }); } @@ -96,7 +96,7 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp.partition(), new HashMap<>()); + seen.put(tp, new PartitionStatistics(tp)); }); } }); @@ -121,16 +121,9 @@ public class EndlessConsumer implements Runnable record.value() ); - Integer partition = record.partition(); + TopicPartition partition = new TopicPartition(record.topic(), record.partition()); String key = record.key() == null ? "NULL" : record.key(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0); - - int seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); + seen.get(partition).increment(key); } } } @@ -151,7 +144,7 @@ public class EndlessConsumer implements Runnable } } - public Map> getSeen() + public Map getSeen() { return seen; } diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java new file mode 100644 index 0000000..147dcd4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyCounter.java @@ -0,0 +1,24 @@ +package de.juplo.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + + +@RequiredArgsConstructor +@Getter +@EqualsAndHashCode(of = { "key" }) +@ToString +public class KeyCounter +{ + private final String key; + + private long counter = 0; + + + public long increment() + { + return ++counter; + } +} diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java new file mode 100644 index 0000000..e47a9f9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/PartitionStatistics.java @@ -0,0 +1,56 @@ +package de.juplo.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +@RequiredArgsConstructor +@Getter +@EqualsAndHashCode(of = { "partition" }) +public class PartitionStatistics +{ + private final TopicPartition partition; + private final Map statistics = new HashMap<>(); + + + public KeyCounter addKey(String key) + { + KeyCounter counter = new KeyCounter(key); + + counter.increment(); + statistics.put(key, counter); + + return counter; + } + + + public long increment(String key) + { + KeyCounter counter = statistics.get(key); + + if (counter == null) + { + counter = new KeyCounter(key); + statistics.put(key, counter); + } + + return counter.increment(); + } + + public Collection getStatistics() + { + return statistics.values(); + } + + @Override + public String toString() + { + return partition.toString(); + } +} diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java new file mode 100644 index 0000000..ed8230d --- /dev/null +++ b/src/main/java/de/juplo/kafka/PartitionStatisticsSerializer.java @@ -0,0 +1,43 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; + + +@Slf4j +public class PartitionStatisticsSerializer extends JsonSerializer +{ + @Override + public Class handledType() + { + return PartitionStatistics.class; + } + + @Override + public void serialize( + PartitionStatistics statistics, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException + { + jsonGenerator.writeStartObject(); + statistics + .getStatistics() + .forEach((counter) -> + { + try + { + jsonGenerator.writeNumberField(counter.getKey(), counter.getResult()); + } + catch (NumberFormatException | IOException e) + { + log.error("Could not write {}: {}", counter, e.toString()); + } + }); + jsonGenerator.writeEndObject(); + } +} diff --git a/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java new file mode 100644 index 0000000..e7190a5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/TopicPartitionSerializer.java @@ -0,0 +1,27 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.kafka.common.TopicPartition; + +import java.io.IOException; + + +public class TopicPartitionSerializer extends JsonSerializer +{ + @Override + public Class handledType() + { + return TopicPartition.class; + } + + @Override + public void serialize( + TopicPartition topicPartition, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) throws IOException + { + jsonGenerator.writeString(topicPartition.toString()); + } +} -- 2.20.1