From: Kai Moritz Date: Sun, 21 Aug 2022 08:01:42 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fcounting-consumer;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht * Angelehnt an die Code-Vereinheitlickung in `sumup-adder`. * Dabei: Setup und Skript angepasst und repariert. --- diff --git a/README.sh b/README.sh index 13176d2..6dbc9d9 100755 --- a/README.sh +++ b/README.sh @@ -9,6 +9,7 @@ then exit fi +docker-compose stop producer consumer docker-compose up -d zookeeper kafka cli if [[ @@ -16,7 +17,8 @@ if [[ "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf consumer + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -24,7 +26,6 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up -d kafka-ui docker-compose exec -T cli bash << 'EOF' echo "Creating topic with 3 partitions..." @@ -35,17 +36,12 @@ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF -docker-compose up -d consumer +docker-compose up -d producer consumer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done +while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done -docker-compose up -d producer -sleep 10 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state docker-compose stop producer docker-compose exec -T cli bash << 'EOF' @@ -57,32 +53,33 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF docker-compose start producer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done + sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen -docker-compose stop producer consumer +http -v :8081/state + +docker-compose stop producer diff --git a/docker-compose.yml b/docker-compose.yml index df6b321..f13e04d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,14 +24,6 @@ services: depends_on: - zookeeper - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 - cli: image: juplo/toolbox command: sleep infinity diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bb219d0..bf00b6d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,16 +17,16 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler keyCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRecordHandler keyCountingRecordHandler, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -35,7 +35,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRecordHandler); + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java new file mode 100644 index 0000000..3492c0d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -0,0 +1,40 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; + + +@Slf4j +public class ApplicationRecordHandler implements RecordHandler +{ + private final Map> state = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); + + if (!state.containsKey(partition)) + state.put(partition, new HashMap<>()); + + Map byKey = state.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + + + public Map> getState() + { + return state; + } +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index f6ff47f..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final KeyCountingRecordHandler keyCountingRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -29,10 +29,10 @@ public class DriverController } - @GetMapping("seen") - public Map> seen() + @GetMapping("state") + public Map> state() { - return keyCountingRecordHandler.getSeen(); + return recordHandler.getState(); } diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java deleted file mode 100644 index 83b3ff2..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class KeyCountingRecordHandler implements RecordHandler -{ - private final Map> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); - - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - - - public Map> getSeen() - { - return seen; - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 0909f2c..ffc0a0b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,7 +65,7 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -268,7 +268,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) {