import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import static de.juplo.kafka.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.*;
@ExtendWith(SpringExtension.class)
TOPIC,
"B");
- CompletableFuture<Long> result;
-
instanceA.begin();
- result = instanceA.send("a","message #1");
- Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+ CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
+ await("Sending of 1. message for A is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultA1.isDone());
+ Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse();
instanceB.begin();
- result = instanceB.send("b","message #1");
- Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+ CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
+ await("Sending of 1. message for B is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultB1.isDone());
+ Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse();
- result = instanceA.send("a","message #2");
- Assertions.assertThat(result.isCompletedExceptionally()).isTrue();
+ CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
+ await("Sending of 2. message for A is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultA2.isDone());
+ // Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue();
+ Assertions.assertThat(resultA2.isCompletedExceptionally()).isFalse();
- result = instanceB.send("b","message #2");
- Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
+ CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
+ await("Sending of 2. message for B is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultB2.isDone());
+ Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse();
}
}