From 915674ec49ba38b3716cc4ef53272e963f139677 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 6 Apr 2022 20:54:37 +0200 Subject: [PATCH] Die Key-Statistiken werden in einer MongoDB gespeichert --- docker-compose.yml | 11 +++++- pom.xml | 4 ++ src/main/java/de/juplo/kafka/Application.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 11 +++++- src/main/java/de/juplo/kafka/KeyCounter.java | 7 ++++ .../de/juplo/kafka/PartitionStatistics.java | 23 ++++++++++- .../kafka/PartitionStatisticsRepository.java | 11 ++++++ .../de/juplo/kafka/StatisticsDocument.java | 39 +++++++++++++++++++ src/main/resources/application.yml | 5 +++ 9 files changed, 109 insertions(+), 5 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java create mode 100644 src/main/java/de/juplo/kafka/StatisticsDocument.java diff --git a/docker-compose.yml b/docker-compose.yml index 0b1f0ae..3428d16 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,14 @@ services: depends_on: - zookeeper + mongo: + image: mongo:4.4 + ports: + - 27017:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: juplo + MONGO_INITDB_ROOT_PASSWORD: training + kafka-ui: image: provectuslabs/kafka-ui:0.3.3 ports: @@ -53,6 +61,7 @@ services: - 8081:8081 environment: consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group consumer.client-id: consumer consumer.topic: test + spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 + spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index b7b0b8d..78b2fde 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-mongodb + org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 7cfe268..23c845a 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -20,7 +20,7 @@ public class Application @Bean - public EndlessConsumer consumer() + public EndlessConsumer consumer(PartitionStatisticsRepository repository) { Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); @@ -30,6 +30,7 @@ public class Application EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), + repository, properties.getBootstrapServer(), properties.getGroupId(), properties.getClientId(), diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index c7bc852..e67bf41 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class EndlessConsumer implements Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String bootstrapServer; private final String groupId; private final String id; @@ -38,6 +39,7 @@ public class EndlessConsumer implements Runnable public EndlessConsumer( ExecutorService executor, + PartitionStatisticsRepository repository, String bootstrapServer, String groupId, String clientId, @@ -45,6 +47,7 @@ public class EndlessConsumer implements Runnable String autoOffsetReset) { this.executor = executor; + this.repository = repository; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; @@ -87,6 +90,7 @@ public class EndlessConsumer implements Runnable removed.getPartition(), counter.getKey()); } + repository.save(new StatisticsDocument(removed)); }); } @@ -96,7 +100,12 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp, new PartitionStatistics(tp)); + seen.put( + tp, + repository + .findById(tp.toString()) + .map(PartitionStatistics::new) + .orElse(new PartitionStatistics(tp))); }); } }); diff --git a/src/main/java/de/juplo/kafka/KeyCounter.java b/src/main/java/de/juplo/kafka/KeyCounter.java index b2cde47..1e3cbd2 100644 --- a/src/main/java/de/juplo/kafka/KeyCounter.java +++ b/src/main/java/de/juplo/kafka/KeyCounter.java @@ -17,6 +17,13 @@ public class KeyCounter private long result = 0; + public KeyCounter(String key, long initialValue) + { + this.key = key; + this.result = initialValue; + } + + public long increment() { return ++result; diff --git a/src/main/java/de/juplo/kafka/PartitionStatistics.java b/src/main/java/de/juplo/kafka/PartitionStatistics.java index e47a9f9..0e31945 100644 --- a/src/main/java/de/juplo/kafka/PartitionStatistics.java +++ b/src/main/java/de/juplo/kafka/PartitionStatistics.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.kafka.common.TopicPartition; import java.util.Collection; @@ -10,15 +9,35 @@ import java.util.HashMap; import java.util.Map; -@RequiredArgsConstructor @Getter @EqualsAndHashCode(of = { "partition" }) public class PartitionStatistics { + private String id; private final TopicPartition partition; private final Map statistics = new HashMap<>(); + public PartitionStatistics(TopicPartition partition) + { + this.partition = partition; + } + + public PartitionStatistics(StatisticsDocument document) + { + this.partition = new TopicPartition(document.topic, document.partition); + document + .statistics + .entrySet() + .forEach(entry -> + { + this.statistics.put( + entry.getKey(), + new KeyCounter(entry.getKey(), entry.getValue())); + }); + } + + public KeyCounter addKey(String key) { KeyCounter counter = new KeyCounter(key); diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java new file mode 100644 index 0000000..0ccf3cd --- /dev/null +++ b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface PartitionStatisticsRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java new file mode 100644 index 0000000..9318c4c --- /dev/null +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "statistics") +@ToString +public class StatisticsDocument +{ + @Id + public String id; + public String topic; + public Integer partition; + public Map statistics; + + public StatisticsDocument() + { + } + + public StatisticsDocument(String topic, Integer partition, Map statistics) + { + this.partition = partition; + this.statistics = statistics; + } + + public StatisticsDocument(PartitionStatistics statistics) + { + this.topic = statistics.getPartition().topic(); + this.id = statistics.toString(); + this.partition = statistics.getPartition().partition(); + this.statistics = new HashMap<>(); + statistics.getStatistics().forEach(counter -> this.statistics.put(counter.getKey(), counter.getResult())); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index db37822..0e7c53c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,6 +9,11 @@ management: web: exposure: include: "*" +spring: + data: + mongodb: + uri: mongodb://juplo:training@localhost:27017 + database: juplo logging: level: root: INFO -- 2.20.1