WIP
authorKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 22:12:05 +0000 (23:12 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 22:12:05 +0000 (23:12 +0100)
pom.xml
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 00042cc..458b0e3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <artifactId>spring-kafka-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
index 8bf623a..75b9a31 100644 (file)
@@ -8,10 +8,12 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.*;
 
 
 @ExtendWith(SpringExtension.class)
@@ -42,20 +44,31 @@ public class ApplicationTests
       TOPIC,
       "B");
 
-    CompletableFuture<Long> result;
-
     instanceA.begin();
-    result = instanceA.send("a","message #1");
-    Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+    CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
+               await("Sending of 1. message for A is completed")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> resultA1.isDone());
+    Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse();
 
     instanceB.begin();
-    result = instanceB.send("b","message #1");
-    Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+    CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
+    await("Sending of 1. message for B is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultB1.isDone());
+    Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse();
 
-    result = instanceA.send("a","message #2");
-    Assertions.assertThat(result.isCompletedExceptionally()).isTrue();
+    CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
+    await("Sending of 2. message for A is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultA2.isDone());
+    // Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue();
+    Assertions.assertThat(resultA2.isCompletedExceptionally()).isFalse();
 
-    result = instanceB.send("b","message #2");
-    Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+    CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
+    await("Sending of 2. message for B is completed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> resultB2.isDone());
+    Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse();
   }
 }