1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.assertj.core.api.Assertions;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
11 import java.util.concurrent.CompletableFuture;
13 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
14 import static de.juplo.kafka.ApplicationTests.TOPIC;
17 @ExtendWith(SpringExtension.class)
20 partitions = PARTITIONS,
22 "transaction.state.log.replication.factor=1",
23 "transaction.state.log.min.isr=1" })
25 public class ApplicationTests
27 static final String TOPIC = "FOO";
28 static final int PARTITIONS = 10;
30 @Value("${spring.embedded.kafka.brokers}")
31 String bootstrapServers;
34 void testConcurrentProducers() throws Exception
36 SimpleProducer instanceA = new SimpleProducer(
40 SimpleProducer instanceB = new SimpleProducer(
45 CompletableFuture<Long> result;
48 result = instanceA.send("a","message #1");
49 Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
52 result = instanceB.send("b","message #1");
53 Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
55 result = instanceA.send("a","message #2");
56 Assertions.assertThat(result.isCompletedExceptionally()).isTrue();
58 result = instanceB.send("b","message #2");
59 Assertions.assertThat(result.isCompletedExceptionally()).isFalse();