WIP
authorKai Moritz <kai@juplo.de>
Mon, 6 Sep 2021 20:21:29 +0000 (22:21 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 6 Sep 2021 20:21:29 +0000 (22:21 +0200)
.maven-dockerexclude [new file with mode: 0644]
.maven-dockerinclude [new file with mode: 0644]
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/seek/Consumer.java

diff --git a/.maven-dockerexclude b/.maven-dockerexclude
new file mode 100644 (file)
index 0000000..72e8ffc
--- /dev/null
@@ -0,0 +1 @@
+*
diff --git a/.maven-dockerinclude b/.maven-dockerinclude
new file mode 100644 (file)
index 0000000..fd6cecd
--- /dev/null
@@ -0,0 +1 @@
+target/*.jar
index 16f8af5..09b1ba5 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,8 +1,4 @@
 #!/bin/bash
-#
-# Führt so noch nix sinnvolles vor!
-# Skript passt nicht zu Implementierung.
-# Implementierung macht (noch?) keinen Sinn.
 
 if [ "$1" = "cleanup" ]
 then
@@ -25,22 +21,25 @@ else
   docker image ls juplo/seek:1.0-SNAPSHOT
 fi
 
-while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
-do
-  echo "Waiting for kafka...";
-  sleep 1;
-done
+docker-compose up -d peter franz
+echo "Waiting for the Kafka-Cluster to become ready..."
+docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 
 docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 7 --topic test
 
 docker-compose up -d producer
-docker-compose up -d peter franz beate ute klaus paul siggi
+
+sleep 3
+http -v post :8001/start
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
+echo 0 | http -v :8001/seek
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
+echo 0 | http -v :8001/seek
+
+http -v post :8002/start
+sleep 10
+echo 0 | http -v :8001/seek
 sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest 
 
-docker-compose stop producer franz beate ute klaus paul sigi
-docker-compose logs consumer
+docker-compose stop producer peter franz
+docker-compose logs peter
index 383c4dd..3317df6 100644 (file)
@@ -3,14 +3,14 @@ version: "3"
 services:
 
   zookeeper:
-    image: confluentinc/cp-zookeeper:6.0.1
+    image: confluentinc/cp-zookeeper:6.2.0
     ports:
       - 2181:2181
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181
 
   kafka:
-    image: confluentinc/cp-kafka:6.0.1
+    image: confluentinc/cp-kafka:6.2.0
     ports:
       - 9092:9092
     environment:
@@ -23,7 +23,7 @@ services:
       - zookeeper
 
   producer:
-    image: confluentinc/cp-kafkacat:6.0.1
+    image: juplo/toolbox
     command:
       bash -c '
       export A=0;
index 7376945..836ca13 100644 (file)
@@ -57,13 +57,10 @@ public class Consumer implements Runnable
   @Override
   public void run()
   {
-    log.info("{} - Subscribing to topic test", id);
-    consumer.subscribe(Arrays.asList(topic));
-
     try
     {
-
-      running = true;
+      log.info("{} - Subscribing to topic test", id);
+      consumer.subscribe(Arrays.asList(topic));
 
       while (running)
       {
@@ -102,8 +99,8 @@ public class Consumer implements Runnable
     finally
     {
       log.info("{} - Unsubscribing...", id);
-      consumer.unsubscribe();
       running = false;
+      consumer.unsubscribe();
       offset = null;
     }
   }
@@ -121,6 +118,7 @@ public class Consumer implements Runnable
       throw new RuntimeException("Consumier instance " + id + " is already running!");
 
     log.info("Running {}", id);
+    running = true;
     future = executor.submit(this);
   }
 
@@ -135,7 +133,6 @@ public class Consumer implements Runnable
     future.get();
   }
 
-
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {