Den `supersimple-producer` für die Acks-Übung überarbeitet
authorKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 17:07:03 +0000 (18:07 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 18:16:35 +0000 (19:16 +0100)
* Damit der Producer in der Acks-Übung wiederverwendet werden kann, muss
  er endlos Nachrichten produzieren.
* Für die Acks-Übung muss das Producing außerdem gedrosselt laufen.
* Vorführung in `README.sh` entsprechend angepasst.
* Da der Producer sich nie beendet, landet der Test in einer Endlosschleife
* Daher musste der Test hier erstmal deaktiviert werden :/

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 270dfa8..10e79a0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -10,7 +10,7 @@ then
 fi
 
 docker-compose up -d kafka-1 kafka-2 kafka-3 cli
-docker-compose rm -svf producer
+docker-compose rm -svf acks-all acks-1 acks-0
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -24,6 +24,11 @@ else
 fi
 
 docker-compose up setup
-docker-compose up producer
-
-docker-compose exec cli kafkacat -b kafka:9092 -t test -q -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
+docker-compose up -d acks-all acks-1
+sleep 5
+docker-compose stop kafka-1
+sleep 5
+docker-compose stop kafka-3
+sleep 5
+docker-compose stop acks-all acks-1
+docker-compose logs acks-all acks-1
index 34bb5f3..f63713d 100644 (file)
@@ -101,8 +101,27 @@ services:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer:
+  acks-all:
     image: juplo/supersimple-producer:1.0-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
-      spring.kafka.client-id: producer
+      spring.kafka.client-id: acks-all
+      spring.kafka.producer.acks: all
+
+  acks-1:
+    image: juplo/supersimple-producer:1.0-SNAPSHOT
+    environment:
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: acks-1
+      spring.kafka.producer.acks: 1
+
+  acks-0:
+    image: juplo/supersimple-producer:1.0-SNAPSHOT
+    environment:
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: acks-0
+      spring.kafka.producer.acks: 0
+
+  consumer:
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
index b304fa9..a0b96c2 100644 (file)
@@ -21,7 +21,7 @@ public class Application implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args)
   {
-    for (int i = 0; i < 100; i++)
+    for (int i = 0; true; i++)
     {
       // tag::callback[]
       ListenableFuture<SendResult<String, String>> listenableFuture =
@@ -36,6 +36,15 @@ public class Application implements ApplicationRunner
               result.getRecordMetadata().offset()),
           e -> log.error("ERROR sendig message", e));
       // end::callback[]
+
+      try
+      {
+        Thread.sleep(500);
+      }
+      catch (InterruptedException e)
+      {
+        return;
+      }
     }
   }
 
index 714175e..7796c82 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -13,6 +14,7 @@ import static de.juplo.kafka.ApplicationTests.TOPIC;
         "spring.kafka.template.default-topic=" + TOPIC
     })
 @EmbeddedKafka(topics = TOPIC)
+@Disabled
 public class ApplicationTests
 {
   public final static String TOPIC = "out";