1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.errors.InvalidProducerEpochException;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
11 import java.time.Duration;
12 import java.util.concurrent.CompletableFuture;
14 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
15 import static de.juplo.kafka.ApplicationTests.TOPIC;
16 import static org.assertj.core.api.Assertions.assertThat;
17 import static org.assertj.core.api.Assertions.assertThatThrownBy;
18 import static org.awaitility.Awaitility.*;
21 @ExtendWith(SpringExtension.class)
24 partitions = PARTITIONS,
26 "transaction.state.log.replication.factor=1",
27 "transaction.state.log.min.isr=1" })
29 public class ApplicationTests
31 static final String TOPIC = "FOO";
32 static final int PARTITIONS = 10;
34 @Value("${spring.embedded.kafka.brokers}")
35 String bootstrapServers;
38 void testConcurrentProducers() throws Exception
40 SimpleProducer instanceA = new SimpleProducer(
46 CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
47 await("Sending of 1. message for A is completed")
48 .atMost(Duration.ofSeconds(5))
49 .until(() -> resultA1.isDone());
50 assertThat(resultA1.isCompletedExceptionally()).isFalse();
52 SimpleProducer instanceB = new SimpleProducer(
58 CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
59 await("Sending of 1. message for B is completed")
60 .atMost(Duration.ofSeconds(5))
61 .until(() -> resultB1.isDone());
62 assertThat(resultB1.isCompletedExceptionally()).isFalse();
64 CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
65 await("Sending of 2. message for A is completed")
66 .atMost(Duration.ofSeconds(5))
67 .until(() -> resultA2.isDone());
68 assertThat(resultA2.isCompletedExceptionally()).isTrue();
70 CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
71 await("Sending of 2. message for B is completed")
72 .atMost(Duration.ofSeconds(5))
73 .until(() -> resultB2.isDone());
74 assertThat(resultB2.isCompletedExceptionally()).isFalse();
76 assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class);