From: Kai Moritz Date: Wed, 24 Jan 2024 22:28:03 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=00120793502ff770d3cc79319bcb547d1b370eb1;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 370dbc2..c861034 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,7 +1,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.assertj.core.api.Assertions; +import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Value; @@ -13,6 +13,8 @@ import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.*; @@ -41,11 +43,11 @@ public class ApplicationTests "FOO"); instanceA.begin(); - CompletableFuture resultA1 = instanceA.send("foo","message #1"); + CompletableFuture resultA1 = instanceA.send("a","message #1"); await("Sending of 1. message for A is completed") .atMost(Duration.ofSeconds(5)) .until(() -> resultA1.isDone()); - Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse(); + assertThat(resultA1.isCompletedExceptionally()).isFalse(); SimpleProducer instanceB = new SimpleProducer( bootstrapServers, @@ -53,22 +55,25 @@ public class ApplicationTests "FOO"); instanceB.begin(); - CompletableFuture resultB1 = instanceB.send("foo","message #1"); + CompletableFuture resultB1 = instanceB.send("b","message #1"); await("Sending of 1. message for B is completed") .atMost(Duration.ofSeconds(5)) .until(() -> resultB1.isDone()); - Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse(); + assertThat(resultB1.isCompletedExceptionally()).isFalse(); - CompletableFuture resultA2 = instanceA.send("foo","message #2"); + CompletableFuture resultA2 = instanceA.send("a","message #2"); await("Sending of 2. message for A is completed") .atMost(Duration.ofSeconds(5)) .until(() -> resultA2.isDone()); - Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue(); + assertThat(resultA2.isCompletedExceptionally()).isTrue(); - CompletableFuture resultB2 = instanceB.send("foo","message #2"); + CompletableFuture resultB2 = instanceB.send("b","message #2"); await("Sending of 2. message for B is completed") .atMost(Duration.ofSeconds(5)) .until(() -> resultB2.isDone()); - Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse(); + assertThat(resultB2.isCompletedExceptionally()).isFalse(); + + assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class); + instanceB.commit(); } }