From: Kai Moritz Date: Sat, 20 Aug 2022 16:22:25 +0000 (+0200) Subject: Vorlage für den 1. Teil der Summenformel-Übung X-Git-Tag: sumup-adder--vorlage---lvm-2-tage~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dd9103fdedc432ba861abeaf5cfb4acb66749f15;p=demos%2Fkafka%2Ftraining Vorlage für den 1. Teil der Summenformel-Übung --- diff --git a/README.sh b/README.sh index f337d5c..bc2a674 100755 --- a/README.sh +++ b/README.sh @@ -28,63 +28,43 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d gateway requests-1 requests-2 +docker-compose up -d gateway requests-1 requests-2 adder-1 adder-2 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done -docker-compose up -d peter klaus +echo 6 | http -v :8080/peter +echo 6 | http -v :8080/klaus -docker-compose up -d adder-1 -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done -http -v --pretty none -S :8091/results -echo - -sleep 3 echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo +http -v :8091/results -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - - -docker-compose up -d adder-2 -while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done -http -v --pretty none -S :8092/results -echo - -sleep 3 echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq +http -v :8092/results docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +echo 66 | http -v :8080/peter +echo 66 | http -v :8080/klaus +sleep 1 echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq +http -v :8092/results -docker-compose kill -s 9 peter klaus +docker-compose start adder-1 +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; +do + echo "Waiting for some results to show up on adder-1..."; + echo 666 | http -v :8080/peter + echo 666 | http -v :8080/klaus + sleep 1; +done +echo "Resultate für adder-1" +http -v :8091/results +echo "Resultate für adder-2" +http -v :8092/results diff --git a/docker-compose.yml b/docker-compose.yml index 5f0acac..8052a4c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -135,7 +135,6 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: image: juplo/sumup-adder:1.0-SNAPSHOT @@ -149,7 +148,6 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG peter: image: juplo/toolbox diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java index e7f5602..da9f65e 100644 --- a/src/main/java/de/juplo/kafka/AdderResults.java +++ b/src/main/java/de/juplo/kafka/AdderResults.java @@ -15,6 +15,12 @@ public class AdderResults { Map> resultsByUser = this.results.get(partition); + if (resultsByUser == null) + { + resultsByUser = new HashMap<>(); + results.put(partition, resultsByUser); + } + List results = resultsByUser.get(user); if (results == null) { diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..e214a14 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -26,22 +26,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener { Integer partition = tp.partition(); log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); }); } @@ -52,8 +36,7 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener { Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); + Map state = recordHandler.getState(partition).getState(); for (String user : state.keySet()) { log.info( @@ -63,8 +46,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener user, state.get(user)); } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); }); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..e5e40ed 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -28,6 +28,9 @@ public class ApplicationRecordHandler implements RecordHandler String user = record.key(); String message = record.value(); + if (!state.containsKey(partition)) + state.put(partition, new AdderBusinessLogic()); + if (message.equals("CALCULATE")) { AdderResult result = state.get(partition).calculate(user);