#!/bin/bash
-#
-# Führt so noch nix sinnvolles vor!
-# Skript passt nicht zu Implementierung.
-# Implementierung macht (noch?) keinen Sinn.
if [ "$1" = "cleanup" ]
then
docker image ls juplo/seek:1.0-SNAPSHOT
fi
-while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
-do
- echo "Waiting for kafka...";
- sleep 1;
-done
+docker-compose up -d peter franz
+echo "Waiting for the Kafka-Cluster to become ready..."
+docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 7 --topic test
docker-compose up -d producer
-docker-compose up -d peter franz beate ute klaus paul siggi
+
+sleep 3
+http -v post :8001/start
sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest
+echo 0 | http -v :8001/seek
sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest
+echo 0 | http -v :8001/seek
+
+http -v post :8002/start
+sleep 10
+echo 0 | http -v :8001/seek
sleep 3
-docker-compose exec kafka kafka-consumer-groups --bootstrap-server :9092 --group seek --reset-offsets --to-earliest
-docker-compose stop producer franz beate ute klaus paul sigi
-docker-compose logs consumer
+docker-compose stop producer peter franz
+docker-compose logs peter
services:
zookeeper:
- image: confluentinc/cp-zookeeper:6.0.1
+ image: confluentinc/cp-zookeeper:6.2.0
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
- image: confluentinc/cp-kafka:6.0.1
+ image: confluentinc/cp-kafka:6.2.0
ports:
- 9092:9092
environment:
- zookeeper
producer:
- image: confluentinc/cp-kafkacat:6.0.1
+ image: juplo/toolbox
command:
bash -c '
export A=0;
@Override
public void run()
{
- log.info("{} - Subscribing to topic test", id);
- consumer.subscribe(Arrays.asList(topic));
-
try
{
-
- running = true;
+ log.info("{} - Subscribing to topic test", id);
+ consumer.subscribe(Arrays.asList(topic));
while (running)
{
finally
{
log.info("{} - Unsubscribing...", id);
- consumer.unsubscribe();
running = false;
+ consumer.unsubscribe();
offset = null;
}
}
throw new RuntimeException("Consumier instance " + id + " is already running!");
log.info("Running {}", id);
+ running = true;
future = executor.submit(this);
}
future.get();
}
-
@PreDestroy
public void destroy() throws ExecutionException, InterruptedException
{