WIP
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.jupiter.api.*;
5 import org.junit.jupiter.api.extension.ExtendWith;
6 import org.springframework.beans.factory.annotation.Value;
7 import org.springframework.kafka.test.context.EmbeddedKafka;
8 import org.springframework.test.context.junit.jupiter.SpringExtension;
9
10 import java.time.Duration;
11 import java.util.concurrent.CompletableFuture;
12
13 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
14 import static de.juplo.kafka.ApplicationTests.TOPIC;
15 import static org.awaitility.Awaitility.*;
16
17
18 @ExtendWith(SpringExtension.class)
19 @EmbeddedKafka(
20   topics = TOPIC,
21   partitions = PARTITIONS,
22   brokerProperties = {
23     "transaction.state.log.replication.factor=1",
24     "transaction.state.log.min.isr=1" })
25 @Slf4j
26 public class ApplicationTests
27 {
28         static final String TOPIC = "FOO";
29         static final int PARTITIONS = 10;
30
31   @Value("${spring.embedded.kafka.brokers}")
32   String bootstrapServers;
33
34         @Test
35         void testConcurrentProducers() throws Exception
36         {
37     SimpleProducer instanceA = new SimpleProducer(
38       bootstrapServers,
39       TOPIC,
40       "A");
41     SimpleProducer instanceB = new SimpleProducer(
42       bootstrapServers,
43       TOPIC,
44       "B");
45
46     CompletableFuture<Long> result;
47
48     result = instanceA.send("a","message #1");
49                 await("Sending of 1. message for A is completed")
50                                 .atMost(Duration.ofSeconds(5))
51                                 .until(() -> result.isDone());
52         }
53 }