<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
TOPIC,
"B");
+ CompletableFuture<Long> result;
+
instanceA.begin();
- CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
- await("Sending of 1. message for A is completed")
- .atMost(Duration.ofSeconds(5))
- .until(() -> resultA1.isDone());
- Assertions.assertThat(resultA1.get())
+ result = instanceA.send("a","message #1");
+ Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
instanceB.begin();
- CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
- await("Sending of 1. message for B is completed")
- .atMost(Duration.ofSeconds(5))
- .until(() -> resultB1.isDone());
+ result = instanceB.send("b","message #1");
+ Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
- CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
- await("Sending of 2. message for A is completed")
- .atMost(Duration.ofSeconds(5))
- .until(() -> resultA2.isDone());
+ result = instanceA.send("a","message #2");
+ Assertions.assertThat(result.isCompletedExceptionally()).isTrue();
- CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
- await("Sending of 2. message for B is completed")
- .atMost(Duration.ofSeconds(5))
- .until(() -> resultB2.isDone());
+ result = instanceB.send("b","message #2");
+ Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
}
}