From: Kai Moritz Date: Mon, 6 Sep 2021 20:21:29 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d6b305a86f7006eb52be48a9f233352523437e6d;p=demos%2Fkafka%2Fseek WIP --- diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..fd6cecd --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/README.sh b/README.sh index 16f8af5..09b1ba5 100755 --- a/README.sh +++ b/README.sh @@ -1,8 +1,4 @@ #!/bin/bash -# -# Führt so noch nix sinnvolles vor! -# Skript passt nicht zu Implementierung. -# Implementierung macht (noch?) keinen Sinn. if [ "$1" = "cleanup" ] then @@ -25,22 +21,25 @@ else docker image ls juplo/seek:1.0-SNAPSHOT fi -while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; -do - echo "Waiting for kafka..."; - sleep 1; -done +docker-compose up -d peter franz +echo "Waiting for the Kafka-Cluster to become ready..." +docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 7 --topic test docker-compose up -d producer -docker-compose up -d peter franz beate ute klaus paul siggi + +sleep 3 +http -v post :8001/start sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest +echo 0 | http -v :8001/seek sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest +echo 0 | http -v :8001/seek + +http -v post :8002/start +sleep 10 +echo 0 | http -v :8001/seek sleep 3 -docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest -docker-compose stop producer franz beate ute klaus paul sigi -docker-compose logs consumer +docker-compose stop producer peter franz +docker-compose logs peter diff --git a/docker-compose.yml b/docker-compose.yml index 383c4dd..3317df6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,14 +3,14 @@ version: "3" services: zookeeper: - image: confluentinc/cp-zookeeper:6.0.1 + image: confluentinc/cp-zookeeper:6.2.0 ports: - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: confluentinc/cp-kafka:6.0.1 + image: confluentinc/cp-kafka:6.2.0 ports: - 9092:9092 environment: @@ -23,7 +23,7 @@ services: - zookeeper producer: - image: confluentinc/cp-kafkacat:6.0.1 + image: juplo/toolbox command: bash -c ' export A=0; diff --git a/src/main/java/de/juplo/kafka/seek/Consumer.java b/src/main/java/de/juplo/kafka/seek/Consumer.java index 7376945..836ca13 100644 --- a/src/main/java/de/juplo/kafka/seek/Consumer.java +++ b/src/main/java/de/juplo/kafka/seek/Consumer.java @@ -57,13 +57,10 @@ public class Consumer implements Runnable @Override public void run() { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList(topic)); - try { - - running = true; + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList(topic)); while (running) { @@ -102,8 +99,8 @@ public class Consumer implements Runnable finally { log.info("{} - Unsubscribing...", id); - consumer.unsubscribe(); running = false; + consumer.unsubscribe(); offset = null; } } @@ -121,6 +118,7 @@ public class Consumer implements Runnable throw new RuntimeException("Consumier instance " + id + " is already running!"); log.info("Running {}", id); + running = true; future = executor.submit(this); } @@ -135,7 +133,6 @@ public class Consumer implements Runnable future.get(); } - @PreDestroy public void destroy() throws ExecutionException, InterruptedException {