From: Kai Moritz Date: Sat, 13 Aug 2022 08:48:49 +0000 (+0200) Subject: Verbesserungen aus 'stored-state' nach 'rebalance-listener' gemerged X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=de0468e4db973312e61ad4894edc092e84655161;hp=bf99986fe3daf527d93d444ce1ae7374a8faddcc;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'stored-state' nach 'rebalance-listener' gemerged --- diff --git a/README.sh b/README.sh index 39e9300..13176d2 100755 --- a/README.sh +++ b/README.sh @@ -9,7 +9,7 @@ then exit fi -docker-compose up -d zookeeper kafka cli mongo express +docker-compose up -d zookeeper kafka cli if [[ $(docker image ls -q $IMAGE) == "" || diff --git a/docker-compose.yml b/docker-compose.yml index 7bcf68c..df6b321 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,25 +24,6 @@ services: depends_on: - zookeeper - mongo: - image: mongo:4.4.13 - ports: - - 27017:27017 - environment: - MONGO_INITDB_ROOT_USERNAME: juplo - MONGO_INITDB_ROOT_PASSWORD: training - - express: - image: mongo-express - ports: - - 8090:8081 - environment: - ME_CONFIG_MONGODB_ADMINUSERNAME: juplo - ME_CONFIG_MONGODB_ADMINPASSWORD: training - ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ - depends_on: - - mongo - kafka-ui: image: provectuslabs/kafka-ui:0.3.3 ports: @@ -74,5 +55,3 @@ services: server.port: 8080 consumer.bootstrap-server: kafka:9092 consumer.client-id: consumer - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index 701704d..1f5caab 100644 --- a/pom.xml +++ b/pom.xml @@ -21,10 +21,6 @@ org.springframework.boot spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-data-mongodb - org.springframework.boot spring-boot-starter-validation @@ -61,11 +57,6 @@ awaitility test - - de.flapdoodle.embed - de.flapdoodle.embed.mongo - test - diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1ea90a2..7a0a8ad 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,7 +7,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -26,15 +25,11 @@ public class ApplicationConfiguration @Bean public KeyCountingRebalanceListener keyCountingRebalanceListener( KeyCountingRecordHandler keyCountingRecordHandler, - PartitionStatisticsRepository repository, ApplicationProperties properties) { return new KeyCountingRebalanceListener( keyCountingRecordHandler, - repository, - properties.getClientId(), - Clock.systemDefaultZone(), - properties.getCommitInterval()); + properties.getClientId()); } @Bean diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 047d5cb..c7579b8 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; + private final ConsumerRebalanceListener consumerRebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -42,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); + consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener); while (true) { @@ -67,8 +67,6 @@ public class EndlessConsumer implements Runnable consumed++; } - - pollIntervalAwareRebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java index 636ff86..0ad1f31 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@ -2,26 +2,20 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; import java.util.Collection; +import java.util.HashMap; import java.util.Map; @RequiredArgsConstructor @Slf4j -public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class KeyCountingRebalanceListener implements ConsumerRebalanceListener { private final KeyCountingRecordHandler handler; - private final PartitionStatisticsRepository repository; private final String id; - private final Clock clock; - private final Duration commitInterval; - - private Instant lastCommit = Instant.EPOCH; @Override public void onPartitionsAssigned(Collection partitions) @@ -30,11 +24,7 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe { Integer partition = tp.partition(); log.info("{} - adding partition: {}", id, partition); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - handler.addPartition(partition, document.statistics); + handler.addPartition(partition, new HashMap<>()); }); } @@ -55,22 +45,6 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe partition, key); } - repository.save(new StatisticsDocument(partition, removed)); }); } - - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics))); - lastCommit = clock.instant(); - } - } } diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java deleted file mode 100644 index 0ccf3cd..0000000 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface PartitionStatisticsRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java deleted file mode 100644 index 8abec12..0000000 --- a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; - - -public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener -{ - default void beforeNextPoll() {} -} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index 3c9dd15..327ac9f 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -7,5 +7,4 @@ import java.util.function.Consumer; public interface RecordHandler extends Consumer> { - default void beforeNextPoll() {} } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java deleted file mode 100644 index 415ef5c..0000000 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import lombok.ToString; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -import java.util.HashMap; -import java.util.Map; - - -@Document(collection = "statistics") -@ToString -public class StatisticsDocument -{ - @Id - public String id; - public Map statistics; - - public StatisticsDocument() - { - } - - public StatisticsDocument(Integer partition) - { - this.id = Integer.toString(partition); - this.statistics = new HashMap<>(); - } - - public StatisticsDocument(Integer partition, Map statistics) - { - this.id = Integer.toString(partition); - this.statistics = statistics; - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fc1c68a..f8bfe7e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -25,11 +25,6 @@ info: group-id: ${consumer.group-id} topic: ${consumer.topic} auto-offset-reset: ${consumer.auto-offset-reset} -spring: - data: - mongodb: - uri: mongodb://juplo:training@localhost:27017 - database: juplo logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index d1d8e50..2e6ac7d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; @@ -18,7 +17,6 @@ import static de.juplo.kafka.ApplicationTests.TOPIC; "consumer.topic=" + TOPIC, "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC) -@AutoConfigureDataMongo public class ApplicationIT { public static final String TOPIC = "FOO"; diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 7f666f6..5b13b7d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -12,7 +12,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -41,11 +40,9 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s", - "spring.mongodb.embedded.version=4.4.13" }) + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration -@AutoConfigureDataMongo @Slf4j class ApplicationTests { diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index de28385..b4efdd6 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -19,10 +19,4 @@ public abstract class TestRecordHandler implements RecordHandler this.onNewRecord(record); handler.accept(record); } - @Override - - public void beforeNextPoll() - { - handler.beforeNextPoll(); - } }