WIP
authorKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 22:28:03 +0000 (23:28 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 24 Jan 2024 22:28:03 +0000 (23:28 +0100)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 370dbc2..c861034 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.assertj.core.api.Assertions;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.junit.jupiter.api.*;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.springframework.beans.factory.annotation.Value;
@@ -13,6 +13,8 @@ import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.*;
 
 
@@ -41,11 +43,11 @@ public class ApplicationTests
       "FOO");
 
     instanceA.begin();
-    CompletableFuture<Long> resultA1 = instanceA.send("foo","message #1");
+    CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
                await("Sending of 1. message for A is completed")
                                .atMost(Duration.ofSeconds(5))
                                .until(() -> resultA1.isDone());
-    Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse();
+    assertThat(resultA1.isCompletedExceptionally()).isFalse();
 
     SimpleProducer instanceB = new SimpleProducer(
       bootstrapServers,
@@ -53,22 +55,25 @@ public class ApplicationTests
       "FOO");
 
     instanceB.begin();
-    CompletableFuture<Long> resultB1 = instanceB.send("foo","message #1");
+    CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
     await("Sending of 1. message for B is completed")
       .atMost(Duration.ofSeconds(5))
       .until(() -> resultB1.isDone());
-    Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse();
+    assertThat(resultB1.isCompletedExceptionally()).isFalse();
 
-    CompletableFuture<Long> resultA2 = instanceA.send("foo","message #2");
+    CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
     await("Sending of 2. message for A is completed")
       .atMost(Duration.ofSeconds(5))
       .until(() -> resultA2.isDone());
-    Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue();
+    assertThat(resultA2.isCompletedExceptionally()).isTrue();
 
-    CompletableFuture<Long> resultB2 = instanceB.send("foo","message #2");
+    CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
     await("Sending of 2. message for B is completed")
       .atMost(Duration.ofSeconds(5))
       .until(() -> resultB2.isDone());
-    Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse();
+    assertThat(resultB2.isCompletedExceptionally()).isFalse();
+
+    assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class);
+    instanceB.commit();
   }
 }