From: Kai Moritz Date: Sat, 13 Aug 2022 15:48:24 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=3495017ed2116f338c3342a313abdb7170683573;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/README.sh b/README.sh index d166ac3..9298a99 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-consumer:1.0-SNAPSHOT +IMAGE=juplo/sumup-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -16,7 +16,7 @@ if [[ "$1" = "build" ]] then - mvn install || exit + mvn install -D skipTests || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -25,18 +25,22 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d +docker-compose up -d producer consumer-1 while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done -while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done -sleep 10 +echo -n START | http -v :8080/peter +echo -n 1 | http -v :8080/peter +echo -n 2 | http -v :8080/peter +echo -n 3 | http -v :8080/peter +echo -n 4 | http -v :8080/peter +echo -n 5 | http -v :8080/peter +echo -n 6 | http -v :8080/peter +http -v :8081/state +http -v :8081/state/peter +echo -n END | http -v :8080/peter +http -v :8081/state +http -v :8081/state/peter +docker-compose logs consumer-1 +docker-compose stop consumer-1 -docker-compose stop bart nerd riddler kraut poet linux - -http -v :8081/seen -http -v :8081/seen/bart -http -v :8082/seen -http -v :8082/seen/bart - -docker-compose stop consumer-1 consumer-2 diff --git a/docker-compose.yml b/docker-compose.yml index d855918..dfd4084 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -55,76 +55,6 @@ services: image: juplo/toolbox command: sleep infinity - bart: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune chalkboard - | head -1 - | http -v producer:8080/bart; - echo; - sleep 1; - done" - - nerd: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune computers - | grep -v '^[[:space:]]*--' - | http -v producer:8080/nerd; - echo; - sleep 1; - done" - - riddler: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune riddles - | awk -F':' '/^Q/ { print $$2 }' - | http -v producer:8080/riddler; - echo; - sleep 1; - sleep 1; - done" - - kraut: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune de - | http -v producer:8080/kraut; - echo; - sleep 1; - done" - - poet: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune songs-poems - | http -v producer:8080/poet; - echo; - sleep 1; - done" - - linux: - image: juplo/wordcount--fortune:1.0.0 - command: bash -c " - while [ true ]; - do - /usr/games/fortune linux - | http -v producer:8080/linux; - echo; - sleep 1; - done" - producer: image: juplo/rest-producer:1.0-SNAPSHOT ports: @@ -136,7 +66,7 @@ services: producer.topic: test consumer-1: - image: juplo/wordcount:1.0-SNAPSHOT + image: juplo/sumup-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -148,7 +78,7 @@ services: spring.data.mongodb.database: juplo consumer-2: - image: juplo/wordcount:1.0-SNAPSHOT + image: juplo/sumup-consumer:1.0-SNAPSHOT ports: - 8082:8080 environment: diff --git a/pom.xml b/pom.xml index dd282c5..e309ba9 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - sum + sumup-consumer 1.0-SNAPSHOT - Sum + Summing Up Consumer Calculates the sum of all natuarl numbers up to the given natural number diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index fdae76f..3aa9314 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -7,7 +7,9 @@ import org.springframework.web.bind.annotation.*; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; @RestController @@ -31,20 +33,27 @@ public class DriverController } - @GetMapping("seen") - public Map>> seen() + @GetMapping("state") + public Map> state() { - return sumRecordHandler.getSeen(); + return + sumRecordHandler + .getState() + .entrySet() + .stream() + .collect(Collectors.toMap( + entry -> entry.getKey(), + entry -> entry.getValue().getState())); } - @GetMapping("seen/{user}") - public ResponseEntity> seen(@PathVariable String user) + @GetMapping("state/{user}") + public ResponseEntity seen(@PathVariable String user) { - for (Map> users : sumRecordHandler.getSeen().values()) + for (SumBusinessLogic sumBusinessLogic : sumRecordHandler.getState().values()) { - List results = users.get(user); - if (results != null) - return ResponseEntity.ok(results); + Optional sum = sumBusinessLogic.getSum(user); + if (sum.isPresent()) + return ResponseEntity.ok(sum.get()); } return ResponseEntity.notFound().build(); diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 52968cd..2583c8e 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -17,7 +17,6 @@ public class StateDocument public String id; public long offset = -1l; public Map state; - public Map> seen; public StateDocument() { @@ -27,16 +26,15 @@ public class StateDocument { this.id = Integer.toString(partition); this.state = new HashMap<>(); - this.seen = new HashMap<>(); } public StateDocument( Integer partition, Map state, - Map> seen) + long offset) { this.id = Integer.toString(partition); this.state = state; - this.seen = seen; + this.offset = offset; } } diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java index be752ae..83cb759 100644 --- a/src/main/java/de/juplo/kafka/SumRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -43,7 +43,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document); + handler.addPartition(partition, document.state); }); } @@ -59,7 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL id, partition, newOffset); - repository.save(handler.removePartition(partition)); + repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset)); }); } @@ -70,10 +70,10 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - handler.getSeen().forEach((partiton, statistics) -> repository.save( + handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save( new StateDocument( partiton, - statistics, + sumBusinessLogic.getState(), consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); } diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java index d4ec38f..b0fd27b 100644 --- a/src/main/java/de/juplo/kafka/SumRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -6,11 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + @Slf4j public class SumRecordHandler implements RecordHandler { - private final Map>> seen = new HashMap<>(); private final Map state = new HashMap<>(); @@ -24,36 +25,32 @@ public class SumRecordHandler implements RecordHandler { case "START": state.get(partition).startSum(user); - return; + break; case "END": Long result = state.get(partition).endSum(user); log.info("New result for {}: {}", user, result); - return; + break; default: state.get(partition).addToSum(user, Integer.parseInt(message)); - return; + break; } } - protected void addPartition(Integer partition, StateDocument document) + protected void addPartition(Integer partition, Map state) { - this.seen.put(partition, document.seen); - this.state.put(partition, new SumBusinessLogic(document.state)); + this.state.put(partition, new SumBusinessLogic(state)); } - protected StateDocument removePartition(Integer partition) + protected Map removePartition(Integer partition) { - return new StateDocument( - partition, - this.state.remove(partition).getState(), - this.seen.remove(partition)); + return this.state.remove(partition).getState(); } - public Map>> getSeen() + public Map getState() { - return seen; + return state; } }