package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.assertj.core.api.Assertions;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Value;
import static de.juplo.kafka.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.*;
"FOO");
instanceA.begin();
- CompletableFuture<Long> resultA1 = instanceA.send("foo","message #1");
+ CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
await("Sending of 1. message for A is completed")
.atMost(Duration.ofSeconds(5))
.until(() -> resultA1.isDone());
- Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse();
+ assertThat(resultA1.isCompletedExceptionally()).isFalse();
SimpleProducer instanceB = new SimpleProducer(
bootstrapServers,
"FOO");
instanceB.begin();
- CompletableFuture<Long> resultB1 = instanceB.send("foo","message #1");
+ CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
await("Sending of 1. message for B is completed")
.atMost(Duration.ofSeconds(5))
.until(() -> resultB1.isDone());
- Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse();
+ assertThat(resultB1.isCompletedExceptionally()).isFalse();
- CompletableFuture<Long> resultA2 = instanceA.send("foo","message #2");
+ CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
await("Sending of 2. message for A is completed")
.atMost(Duration.ofSeconds(5))
.until(() -> resultA2.isDone());
- Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue();
+ assertThat(resultA2.isCompletedExceptionally()).isTrue();
- CompletableFuture<Long> resultB2 = instanceB.send("foo","message #2");
+ CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
await("Sending of 2. message for B is completed")
.atMost(Duration.ofSeconds(5))
.until(() -> resultB2.isDone());
- Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse();
+ assertThat(resultB2.isCompletedExceptionally()).isFalse();
+
+ assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class);
+ instanceB.commit();
}
}