WIP
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.assertj.core.api.Assertions;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.time.Duration;
12 import java.util.concurrent.CompletableFuture;
13
14 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
15 import static de.juplo.kafka.ApplicationTests.TOPIC;
16 import static org.awaitility.Awaitility.*;
17
18
19 @ExtendWith(SpringExtension.class)
20 @EmbeddedKafka(
21   topics = TOPIC,
22   partitions = PARTITIONS,
23   brokerProperties = {
24     "transaction.state.log.replication.factor=1",
25     "transaction.state.log.min.isr=1" })
26 @Slf4j
27 public class ApplicationTests
28 {
29         static final String TOPIC = "FOO";
30         static final int PARTITIONS = 10;
31
32   @Value("${spring.embedded.kafka.brokers}")
33   String bootstrapServers;
34
35         @Test
36         void testConcurrentProducers() throws Exception
37         {
38     SimpleProducer instanceA = new SimpleProducer(
39       bootstrapServers,
40       TOPIC,
41       "FOO");
42
43     instanceA.begin();
44     CompletableFuture<Long> resultA1 = instanceA.send("foo","message #1");
45                 await("Sending of 1. message for A is completed")
46                                 .atMost(Duration.ofSeconds(5))
47                                 .until(() -> resultA1.isDone());
48     Assertions.assertThat(resultA1.isCompletedExceptionally()).isFalse();
49
50     SimpleProducer instanceB = new SimpleProducer(
51       bootstrapServers,
52       TOPIC,
53       "FOO");
54
55     instanceB.begin();
56     CompletableFuture<Long> resultB1 = instanceB.send("foo","message #1");
57     await("Sending of 1. message for B is completed")
58       .atMost(Duration.ofSeconds(5))
59       .until(() -> resultB1.isDone());
60     Assertions.assertThat(resultB1.isCompletedExceptionally()).isFalse();
61
62     CompletableFuture<Long> resultA2 = instanceA.send("foo","message #2");
63     await("Sending of 2. message for A is completed")
64       .atMost(Duration.ofSeconds(5))
65       .until(() -> resultA2.isDone());
66     Assertions.assertThat(resultA2.isCompletedExceptionally()).isTrue();
67
68     CompletableFuture<Long> resultB2 = instanceB.send("foo","message #2");
69     await("Sending of 2. message for B is completed")
70       .atMost(Duration.ofSeconds(5))
71       .until(() -> resultB2.isDone());
72     Assertions.assertThat(resultB2.isCompletedExceptionally()).isFalse();
73   }
74 }