From bbf5810e5bd73f9dda532b2526e8442521f4f5d1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 18:07:03 +0100 Subject: [PATCH] =?utf8?q?Den=20`supersimple-producer`=20f=C3=BCr=20die=20?= =?utf8?q?Acks-=C3=9Cbung=20=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Damit der Producer in der Acks-Übung wiederverwendet werden kann, muss er endlos Nachrichten produzieren. * Für die Acks-Übung muss das Producing außerdem gedrosselt laufen. * Vorführung in `README.sh` entsprechend angepasst. * Da der Producer sich nie beendet, landet der Test in einer Endlosschleife * Daher musste der Test hier erstmal deaktiviert werden :/ --- README.sh | 13 +++++++---- docker-compose.yml | 23 +++++++++++++++++-- src/main/java/de/juplo/kafka/Application.java | 11 ++++++++- .../java/de/juplo/kafka/ApplicationTests.java | 2 ++ 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/README.sh b/README.sh index 270dfa8..10e79a0 100755 --- a/README.sh +++ b/README.sh @@ -10,7 +10,7 @@ then fi docker-compose up -d kafka-1 kafka-2 kafka-3 cli -docker-compose rm -svf producer +docker-compose rm -svf acks-all acks-1 acks-0 if [[ $(docker image ls -q $IMAGE) == "" || @@ -24,6 +24,11 @@ else fi docker-compose up setup -docker-compose up producer - -docker-compose exec cli kafkacat -b kafka:9092 -t test -q -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' +docker-compose up -d acks-all acks-1 +sleep 5 +docker-compose stop kafka-1 +sleep 5 +docker-compose stop kafka-3 +sleep 5 +docker-compose stop acks-all acks-1 +docker-compose logs acks-all acks-1 diff --git a/docker-compose.yml b/docker-compose.yml index 34bb5f3..f63713d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -101,8 +101,27 @@ services: image: juplo/toolbox command: sleep infinity - producer: + acks-all: image: juplo/supersimple-producer:1.0-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: producer + spring.kafka.client-id: acks-all + spring.kafka.producer.acks: all + + acks-1: + image: juplo/supersimple-producer:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: acks-1 + spring.kafka.producer.acks: 1 + + acks-0: + image: juplo/supersimple-producer:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: acks-0 + spring.kafka.producer.acks: 0 + + consumer: + image: juplo/toolbox + command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index b304fa9..a0b96c2 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -21,7 +21,7 @@ public class Application implements ApplicationRunner @Override public void run(ApplicationArguments args) { - for (int i = 0; i < 100; i++) + for (int i = 0; true; i++) { // tag::callback[] ListenableFuture> listenableFuture = @@ -36,6 +36,15 @@ public class Application implements ApplicationRunner result.getRecordMetadata().offset()), e -> log.error("ERROR sendig message", e)); // end::callback[] + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + return; + } } } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 714175e..7796c82 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -13,6 +14,7 @@ import static de.juplo.kafka.ApplicationTests.TOPIC; "spring.kafka.template.default-topic=" + TOPIC }) @EmbeddedKafka(topics = TOPIC) +@Disabled public class ApplicationTests { public final static String TOPIC = "out"; -- 2.20.1