WIP
authorKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 21:55:41 +0000 (22:55 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 21:55:41 +0000 (22:55 +0100)
pom.xml
src/main/java/de/juplo/kafka/SimpleProducer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 1c6d364..458b0e3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index b9e8152..3cf1128 100644 (file)
@@ -58,6 +58,16 @@ public class SimpleProducer
     }
   }
 
+  void begin()
+  {
+    producer.beginTransaction();
+  }
+
+  void commit()
+  {
+    producer.commitTransaction();
+  }
+
   CompletableFuture<Long> send(String key, String value)
   {
     final CompletableFuture<Long> result = new CompletableFuture<>();
index d72ddfb..a93c974 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.*;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.springframework.beans.factory.annotation.Value;
@@ -43,11 +44,27 @@ public class ApplicationTests
       TOPIC,
       "B");
 
-    CompletableFuture<Long> result;
-
-    result = instanceA.send("a","message #1");
+    instanceA.begin();
+    CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
                await("Sending of 1. message for A is completed")
                                .atMost(Duration.ofSeconds(5))
-                               .until(() -> result.isDone());
-       }
+                               .until(() -> resultA1.isDone());
+    Assertions.assertThat(resultA1.get())
+
+    instanceB.begin();
+    CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
+    await("Sending of 1. message for B is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultB1.isDone());
+
+    CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
+    await("Sending of 2. message for A is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultA2.isDone());
+
+    CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
+    await("Sending of 2. message for B is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultB2.isDone());
+  }
 }