From: Kai Moritz Date: Sun, 21 Aug 2022 08:33:09 +0000 (+0200) Subject: Benennung vereinheitlicht und projektunabhängig gemacht X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Frebalance-listener;hp=de0468e4db973312e61ad4894edc092e84655161;p=demos%2Fkafka%2Ftraining Benennung vereinheitlicht und projektunabhängig gemacht * Merge branch 'counting-consumer' into rebalance-listener * Außerdem die dort nicht vorhandene Klasse `KeyCountingRebalanceListener` entsprechend umbenannt. --- diff --git a/README.sh b/README.sh index 13176d2..6dbc9d9 100755 --- a/README.sh +++ b/README.sh @@ -9,6 +9,7 @@ then exit fi +docker-compose stop producer consumer docker-compose up -d zookeeper kafka cli if [[ @@ -16,7 +17,8 @@ if [[ "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf consumer + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -24,7 +26,6 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up -d kafka-ui docker-compose exec -T cli bash << 'EOF' echo "Creating topic with 3 partitions..." @@ -35,17 +36,12 @@ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF -docker-compose up -d consumer +docker-compose up -d producer consumer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done +while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done -docker-compose up -d producer -sleep 10 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state docker-compose stop producer docker-compose exec -T cli bash << 'EOF' @@ -57,32 +53,33 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test EOF docker-compose start producer +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done + sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen +http -v :8081/state sleep 1 -http -v :8081/seen -docker-compose stop producer consumer +http -v :8081/state + +docker-compose stop producer diff --git a/docker-compose.yml b/docker-compose.yml index df6b321..f13e04d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,14 +24,6 @@ services: depends_on: - zookeeper - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 - cli: image: juplo/toolbox command: sleep infinity diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7a0a8ad..e9c26fd 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,18 +17,18 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler keyCountingRecordHandler() + public ApplicationRecordHandler recordHandler() { - return new KeyCountingRecordHandler(); + return new ApplicationRecordHandler(); } @Bean - public KeyCountingRebalanceListener keyCountingRebalanceListener( - KeyCountingRecordHandler keyCountingRecordHandler, + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { - return new KeyCountingRebalanceListener( - keyCountingRecordHandler, + return new ApplicationRebalanceListener( + recordHandler, properties.getClientId()); } @@ -36,8 +36,8 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - KeyCountingRebalanceListener keyCountingRebalanceListener, - KeyCountingRecordHandler keyCountingRecordHandler, + ApplicationRebalanceListener rebalanceListener, + ApplicationRecordHandler recordHandler, ApplicationProperties properties) { return @@ -46,8 +46,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, - keyCountingRecordHandler); + rebalanceListener, + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java new file mode 100644 index 0000000..0dcadce --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -0,0 +1,50 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +@RequiredArgsConstructor +@Slf4j +public class ApplicationRebalanceListener implements ConsumerRebalanceListener +{ + private final ApplicationRecordHandler recordHandler; + private final String id; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - adding partition: {}", id, partition); + recordHandler.addPartition(partition, new HashMap<>()); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + log.info("{} - removing partition: {}", id, partition); + Map removed = recordHandler.removePartition(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + }); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java new file mode 100644 index 0000000..dfbf82e --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.HashMap; +import java.util.Map; + + +@Slf4j +public class ApplicationRecordHandler implements RecordHandler +{ + private final Map> state = new HashMap<>(); + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key().toString(); + Map byKey = state.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); + } + + public void addPartition(Integer partition, Map statistics) + { + state.put(partition, statistics); + } + + public Map removePartition(Integer partition) + { + return state.remove(partition); + } + + + public Map> getState() + { + return state; + } +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index f6ff47f..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final KeyCountingRecordHandler keyCountingRecordHandler; + private final ApplicationRecordHandler recordHandler; @PostMapping("start") @@ -29,10 +29,10 @@ public class DriverController } - @GetMapping("seen") - public Map> seen() + @GetMapping("state") + public Map> state() { - return keyCountingRecordHandler.getSeen(); + return recordHandler.getState(); } diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java deleted file mode 100644 index 0ad1f31..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ /dev/null @@ -1,50 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - - -@RequiredArgsConstructor -@Slf4j -public class KeyCountingRebalanceListener implements ConsumerRebalanceListener -{ - private final KeyCountingRecordHandler handler; - private final String id; - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - handler.addPartition(partition, new HashMap<>()); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - Map removed = handler.removePartition(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - }); - } -} diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java deleted file mode 100644 index 099dcf7..0000000 --- a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.HashMap; -import java.util.Map; - - -@Slf4j -public class KeyCountingRecordHandler implements RecordHandler -{ - private final Map> seen = new HashMap<>(); - - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - - public void addPartition(Integer partition, Map statistics) - { - seen.put(partition, statistics); - } - - public Map removePartition(Integer partition) - { - return seen.remove(partition); - } - - - public Map> getSeen() - { - return seen; - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 5b13b7d..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,9 +65,9 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -270,7 +270,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -287,7 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();