package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Value;
TOPIC,
"B");
- CompletableFuture<Long> result;
-
- result = instanceA.send("a","message #1");
+ instanceA.begin();
+ CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
await("Sending of 1. message for A is completed")
.atMost(Duration.ofSeconds(5))
- .until(() -> result.isDone());
- }
+ .until(() -> resultA1.isDone());
+ Assertions.assertThat(resultA1.get())
+
+ instanceB.begin();
+ CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
+ await("Sending of 1. message for B is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultB1.isDone());
+
+ CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
+ await("Sending of 2. message for A is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultA2.isDone());
+
+ CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
+ await("Sending of 2. message for B is completed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> resultB2.isDone());
+ }
}