1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.*;
5 import org.junit.jupiter.api.extension.ExtendWith;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.kafka.test.context.EmbeddedKafka;
8 import org.springframework.test.context.junit.jupiter.SpringExtension;
10 import java.time.Duration;
11 import java.util.concurrent.CompletableFuture;
13 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
14 import static de.juplo.kafka.ApplicationTests.TOPIC;
15 import static org.awaitility.Awaitility.*;
18 @ExtendWith(SpringExtension.class)
19 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
21 public class ApplicationTests
23 static final String TOPIC = "FOO";
24 static final int PARTITIONS = 10;
26 @Value("${spring.embedded.kafka.brokers}")
27 String bootstrapServers;
30 void testConcurrentProducers() throws Exception
32 SimpleProducer instanceA = new SimpleProducer(
36 SimpleProducer instanceB = new SimpleProducer(
41 CompletableFuture<Long> result;
43 result = instanceA.send("a","message #1");
44 await("Sending of 1. message for A is completed")
45 .atMost(Duration.ofSeconds(5))
46 .until(() -> result.isDone());