From: Kai Moritz Date: Wed, 24 Jan 2024 22:12:05 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=38dc0262f20223b450f22e92b73f9315bd7f52cb;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/pom.xml b/pom.xml index 00042cc..458b0e3 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,11 @@ spring-kafka-test test + + org.awaitility + awaitility + test + org.assertj assertj-core diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 8bf623a..75b9a31 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -8,10 +8,12 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.junit.jupiter.SpringExtension; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.*; @ExtendWith(SpringExtension.class) @@ -42,20 +44,31 @@ public class ApplicationTests TOPIC, "B"); - CompletableFuture result; - instanceA.begin(); - result = instanceA.send("a","message #1"); - Assertions.assertThat(result.isCompletedExceptionally()).isFalse(); + CompletableFuture resultA1 = instanceA.send("a","message #1"); + await("Sending of 1. message for A is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultA1.isDone()); + Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse(); instanceB.begin(); - result = instanceB.send("b","message #1"); - Assertions.assertThat(result.isCompletedExceptionally()).isFalse(); + CompletableFuture resultB1 = instanceB.send("b","message #1"); + await("Sending of 1. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB1.isDone()); + Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse(); - result = instanceA.send("a","message #2"); - Assertions.assertThat(result.isCompletedExceptionally()).isTrue(); + CompletableFuture resultA2 = instanceA.send("a","message #2"); + await("Sending of 2. message for A is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultA2.isDone()); + // Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue(); + Assertions.assertThat(resultA2.isCompletedExceptionally()).isFalse(); - result = instanceB.send("b","message #2"); - Assertions.assertThat(result.isCompletedExceptionally()).isFalse(); + CompletableFuture resultB2 = instanceB.send("b","message #2"); + await("Sending of 2. message for B is completed") + .atMost(Duration.ofSeconds(5)) + .until(() -> resultB2.isDone()); + Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse(); } }