c8610347e092a3563f50b260443456a88de0f5fd
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.errors.InvalidProducerEpochException;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.time.Duration;
12 import java.util.concurrent.CompletableFuture;
13
14 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
15 import static de.juplo.kafka.ApplicationTests.TOPIC;
16 import static org.assertj.core.api.Assertions.assertThat;
17 import static org.assertj.core.api.Assertions.assertThatThrownBy;
18 import static org.awaitility.Awaitility.*;
19
20
21 @ExtendWith(SpringExtension.class)
22 @EmbeddedKafka(
23   topics = TOPIC,
24   partitions = PARTITIONS,
25   brokerProperties = {
26     "transaction.state.log.replication.factor=1",
27     "transaction.state.log.min.isr=1" })
28 @Slf4j
29 public class ApplicationTests
30 {
31         static final String TOPIC = "FOO";
32         static final int PARTITIONS = 10;
33
34   @Value("${spring.embedded.kafka.brokers}")
35   String bootstrapServers;
36
37         @Test
38         void testConcurrentProducers() throws Exception
39         {
40     SimpleProducer instanceA = new SimpleProducer(
41       bootstrapServers,
42       TOPIC,
43       "FOO");
44
45     instanceA.begin();
46     CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
47                 await("Sending of 1. message for A is completed")
48                                 .atMost(Duration.ofSeconds(5))
49                                 .until(() -> resultA1.isDone());
50     assertThat(resultA1.isCompletedExceptionally()).isFalse();
51
52     SimpleProducer instanceB = new SimpleProducer(
53       bootstrapServers,
54       TOPIC,
55       "FOO");
56
57     instanceB.begin();
58     CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
59     await("Sending of 1. message for B is completed")
60       .atMost(Duration.ofSeconds(5))
61       .until(() -> resultB1.isDone());
62     assertThat(resultB1.isCompletedExceptionally()).isFalse();
63
64     CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
65     await("Sending of 2. message for A is completed")
66       .atMost(Duration.ofSeconds(5))
67       .until(() -> resultA2.isDone());
68     assertThat(resultA2.isCompletedExceptionally()).isTrue();
69
70     CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
71     await("Sending of 2. message for B is completed")
72       .atMost(Duration.ofSeconds(5))
73       .until(() -> resultB2.isDone());
74     assertThat(resultB2.isCompletedExceptionally()).isFalse();
75
76     assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class);
77     instanceB.commit();
78   }
79 }