From: Kai Moritz Date: Sun, 24 Jul 2022 10:34:53 +0000 (+0200) Subject: Merge der Refaktorisierung des EndlessConsumer (Branch 'deserialization') X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=83a4bf324f5a7ec6010a7921118ec7d6e8f997cf;hp=0ac94b34af644f6fa5a0556fc7e2bd322167c608;p=demos%2Fkafka%2Ftraining Merge der Refaktorisierung des EndlessConsumer (Branch 'deserialization') * Um die Implementierung besser testen zu können, wurde die Anwendung in dem Branch 'deserialization' refaktorisiert. * Diese Refaktorisierung werden hier zusammen mit den eingeführten Tests gemerged. * Der so verfügbar gemachte Test wurde so angepasst, dass er das Speichern des Zustands in einer MongoDB berücksichtigt. --- diff --git a/README.sh b/README.sh index 72f0c60..39e9300 100755 --- a/README.sh +++ b/README.sh @@ -9,7 +9,7 @@ then exit fi -docker-compose up -d zookeeper kafka cli +docker-compose up -d zookeeper kafka cli mongo express if [[ $(docker image ls -q $IMAGE) == "" || @@ -24,23 +24,65 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup -docker-compose up -d producer consumer -sleep 5 +docker-compose up -d kafka-ui + docker-compose exec -T cli bash << 'EOF' -echo "Writing poison pill into topic test..." -# tag::poisonpill[] -echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test -# end::poisonpill[] +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF -while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done -http -v :8081/actuator/health -echo "Restarting consumer" -http -v post :8081/start -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done -while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done -http -v :8081/actuator/health -http -v post :8081/actuator/shutdown + +docker-compose up -d consumer + +docker-compose up -d producer +sleep 10 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen + docker-compose stop producer -docker-compose ps -docker-compose logs --tail=100 consumer +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF + +docker-compose start producer +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +docker-compose stop producer consumer diff --git a/docker-compose.yml b/docker-compose.yml index 159f9cb..30ae3b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,13 +24,30 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + mongo: + image: mongo:4.4.13 + ports: + - 27017:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: juplo + MONGO_INITDB_ROOT_PASSWORD: training + + express: + image: mongo-express + ports: + - 8090:8081 + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: juplo + ME_CONFIG_MONGODB_ADMINPASSWORD: training + ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox @@ -45,7 +62,7 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 10 consumer: @@ -55,6 +72,7 @@ services: environment: server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group consumer.client-id: consumer consumer.topic: test + spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 + spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index 1f5caab..701704d 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-mongodb + org.springframework.boot spring-boot-starter-validation @@ -57,6 +61,11 @@ awaitility test + + de.flapdoodle.embed + de.flapdoodle.embed.mongo + test + diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index b5bd1b9..d280aa6 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -8,7 +8,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PreDestroy; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..1ba9d5b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -32,11 +32,13 @@ public class ApplicationConfiguration KafkaConsumer kafkaConsumer, ExecutorService executor, Consumer> handler, + PartitionStatisticsRepository repository, ApplicationProperties properties) { return new EndlessConsumer<>( executor, + repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 8802df9..a21dd86 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock; public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Consumer consumer; @@ -62,6 +63,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl partition, key); } + repository.save(new StatisticsDocument(partition, removed)); }); } @@ -74,7 +76,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); offsets.put(partition, offset); - seen.put(partition, new HashMap<>()); + seen.put( + partition, + repository + .findById(Integer.toString(tp.partition())) + .map(document -> document.statistics) + .orElse(new HashMap<>())); }); } diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java new file mode 100644 index 0000000..0ccf3cd --- /dev/null +++ b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.Optional; + + +public interface PartitionStatisticsRepository extends MongoRepository +{ + public Optional findById(String partition); +} diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java new file mode 100644 index 0000000..2416253 --- /dev/null +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -0,0 +1,28 @@ +package de.juplo.kafka; + +import lombok.ToString; +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +import java.util.HashMap; +import java.util.Map; + + +@Document(collection = "statistics") +@ToString +public class StatisticsDocument +{ + @Id + public String id; + public Map statistics; + + public StatisticsDocument() + { + } + + public StatisticsDocument(Integer partition, Map statistics) + { + this.id = Integer.toString(partition); + this.statistics = statistics; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..93b27c2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -24,6 +24,11 @@ info: group-id: ${consumer.group-id} topic: ${consumer.topic} auto-offset-reset: ${consumer.auto-offset-reset} +spring: + data: + mongodb: + uri: mongodb://juplo:training@localhost:27017 + database: juplo logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 40dc149..caa25c5 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,8 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -40,8 +42,11 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration +@AutoConfigureDataMongo @Slf4j class ApplicationTests { @@ -63,6 +68,8 @@ class ApplicationTests ApplicationProperties properties; @Autowired ExecutorService executor; + @Autowired + PartitionStatisticsRepository repository; Consumer> testHandler; EndlessConsumer endlessConsumer; @@ -255,6 +262,7 @@ class ApplicationTests endlessConsumer = new EndlessConsumer<>( executor, + repository, properties.getClientId(), properties.getTopic(), kafkaConsumer,