From: Kai Moritz Date: Sat, 23 Jul 2022 13:41:57 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ccf11628d5a1524d4bffe2f1b21b51ad713f1a67;hp=66863e3169440f73ff450c7ba8ea4b9662b180e0;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') * Dabei auch die letzten Verbesserungen aus 'rebalance-listener' übernommen. --- diff --git a/docker-compose.yml b/docker-compose.yml index 1b067cd..e30a7bb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,23 @@ services: depends_on: - zookeeper + mongo: + image: mongo:4.4 + ports: + - 27017:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: juplo + MONGO_INITDB_ROOT_PASSWORD: training + + express: + image: mongo-express + ports: + - 8090:8081 + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: juplo + ME_CONFIG_MONGODB_ADMINPASSWORD: training + ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + kafka-ui: image: provectuslabs/kafka-ui:0.3.3 ports: @@ -55,6 +72,7 @@ services: environment: server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group consumer.client-id: consumer consumer.topic: test + spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 + spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index 9db9d9d..0fbe7e6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-mongodb + org.springframework.boot spring-boot-starter-validation diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index de4b66d..2f6e4f2 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -18,11 +18,12 @@ public class Application @Bean - public EndlessConsumer consumer() + public EndlessConsumer consumer(PartitionStatisticsRepository repository) { EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), + repository, properties.getBootstrapServer(), properties.getGroupId(), properties.getClientId(), diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index c2d4447..e5ef7d0 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock; public class EndlessConsumer implements Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String bootstrapServer; private final String groupId; private final String id; @@ -42,6 +43,7 @@ public class EndlessConsumer implements Runnable public EndlessConsumer( ExecutorService executor, + PartitionStatisticsRepository repository, String bootstrapServer, String groupId, String clientId, @@ -49,6 +51,7 @@ public class EndlessConsumer implements Runnable String autoOffsetReset) { this.executor = executor; + this.repository = repository; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; @@ -91,6 +94,7 @@ public class EndlessConsumer implements Runnable tp.partition(), key); } + repository.save(new StatisticsDocument(tp.partition(), removed)); }); } @@ -100,7 +104,12 @@ public class EndlessConsumer implements Runnable partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp.partition(), new HashMap<>()); + seen.put( + tp.partition(), + repository + .findById(Integer.toString(tp.partition())) + .map(document -> document.statistics) + .orElse(new HashMap<>())); }); } }); diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java new file mode 100644 index 0000000..0ccf3cd --- /dev/null +++ b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface PartitionStatisticsRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java new file mode 100644 index 0000000..be998ca --- /dev/null +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -0,0 +1,28 @@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "statistics") +@ToString +public class StatisticsDocument +{ + @Id + public String id; + public Map statistics; + + public StatisticsDocument() + { + } + + public StatisticsDocument(Integer partition, Map statistics) + { + this.id = Integer.toString(partition); + this.statistics = statistics; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..93b27c2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -24,6 +24,11 @@ info: group-id: ${consumer.group-id} topic: ${consumer.topic} auto-offset-reset: ${consumer.auto-offset-reset} +spring: + data: + mongodb: + uri: mongodb://juplo:training@localhost:27017 + database: juplo logging: level: root: INFO