From 61581ed5dfbb70f66390e7c3e9c261c6e6aa74d4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 21 Aug 2022 10:01:42 +0200 Subject: [PATCH] =?utf8?q?Benennung=20vereinheitlicht=20und=20projektunabh?= =?utf8?q?=C3=A4ngig=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Angelehnt an die Code-Vereinheitlickung in `sumup-adder`. * Dabei: Setup und Skript angepasst und repariert. --- README.sh | 53 +++++++++---------- docker-compose.yml | 8 --- .../juplo/kafka/ApplicationConfiguration.java | 8 +-- ...ler.java => ApplicationRecordHandler.java} | 14 ++--- .../java/de/juplo/kafka/DriverController.java | 8 +-- .../java/de/juplo/kafka/ApplicationTests.java | 4 +- 6 files changed, 42 insertions(+), 53 deletions(-) rename src/main/java/de/juplo/kafka/{KeyCountingRecordHandler.java => ApplicationRecordHandler.java} (61%) diff --git a/README.sh b/README.sh index 13176d2..6dbc9d9 100755 --- a/README.sh +++ b/README.sh @@ -9,6 +9,7 @@ then exit fi +docker-compose stop producer consumer docker-compose up -d zookeeper kafka cli if [[ @@ -16,7 +17,8 @@ if [[ "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf consumer + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -24,7 +26,6 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up -d kafka-ui docker-compose exec -T cli bash << 'EOF' echo "Creating topic with 3 partitions..." @@ -35,17 +36,12 @@ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF -docker-compose up -d consumer +docker-compose up -d producer consumer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done +while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done -docker-compose up -d producer -sleep 10 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state docker-compose stop producer docker-compose exec -T cli bash << 'EOF' @@ -57,32 +53,33 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF docker-compose start producer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done + sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen -docker-compose stop producer consumer +http -v :8081/state + +docker-compose stop producer diff --git a/docker-compose.yml b/docker-compose.yml index df6b321..f13e04d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,14 +24,6 @@ services: depends_on: - zookeeper - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 - cli: image: juplo/toolbox command: sleep infinity diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bb219d0..bf00b6d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,16 +17,16 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler keyCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRecordHandler keyCountingRecordHandler, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -35,7 +35,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRecordHandler); + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java similarity index 61% rename from src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java rename to src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 83b3ff2..3492c0d 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -8,9 +8,9 @@ import java.util.Map; @Slf4j -public class KeyCountingRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { - private final Map> seen = new HashMap<>(); + private final Map> state = new HashMap<>(); @Override @@ -19,10 +19,10 @@ public class KeyCountingRecordHandler implements RecordHandler Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key().toString(); - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); + if (!state.containsKey(partition)) + state.put(partition, new HashMap<>()); - Map byKey = seen.get(partition); + Map byKey = state.get(partition); if (!byKey.containsKey(key)) byKey.put(key, 0l); @@ -33,8 +33,8 @@ public class KeyCountingRecordHandler implements RecordHandler } - public Map> getSeen() + public Map> getState() { - return seen; + return state; } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index f6ff47f..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final KeyCountingRecordHandler keyCountingRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -29,10 +29,10 @@ public class DriverController } - @GetMapping("seen") - public Map> seen() + @GetMapping("state") + public Map> state() { - return keyCountingRecordHandler.getSeen(); + return recordHandler.getState(); } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 0909f2c..ffc0a0b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,7 +65,7 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -268,7 +268,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { -- 2.20.1