581658e8f5ecd9894ae02d03c68fa328f992373f
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.common.errors.InvalidProducerEpochException;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.time.Duration;
12 import java.util.concurrent.CompletableFuture;
13
14 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
15 import static de.juplo.kafka.ApplicationTests.TOPIC;
16 import static org.assertj.core.api.Assertions.*;
17 import static org.awaitility.Awaitility.*;
18
19
20 @ExtendWith(SpringExtension.class)
21 @EmbeddedKafka(
22   topics = TOPIC,
23   partitions = PARTITIONS,
24   brokerProperties = {
25     "transaction.state.log.replication.factor=1",
26     "transaction.state.log.min.isr=1" })
27 @Slf4j
28 public class ApplicationTests
29 {
30         static final String TOPIC = "FOO";
31         static final int PARTITIONS = 10;
32
33   @Value("${spring.embedded.kafka.brokers}")
34   String bootstrapServers;
35
36         @Test
37         void testConcurrentProducers() throws Exception
38         {
39     SimpleProducer instanceA = new SimpleProducer(
40       bootstrapServers,
41       TOPIC,
42       "FOO",
43       "A");
44
45     instanceA.begin();
46     CompletableFuture<Long> resultA1 = instanceA.send("a","message #1");
47                 await("Sending of 1. message for A is completed")
48                                 .atMost(Duration.ofSeconds(5))
49                                 .until(() -> resultA1.isDone());
50     assertThat(resultA1.isCompletedExceptionally()).isFalse();
51
52     SimpleProducer instanceB = new SimpleProducer(
53       bootstrapServers,
54       TOPIC,
55       "FOO",
56       "B");
57
58     instanceB.begin();
59     CompletableFuture<Long> resultB1 = instanceB.send("b","message #1");
60     await("Sending of 1. message for B is completed")
61       .atMost(Duration.ofSeconds(5))
62       .until(() -> resultB1.isDone());
63     assertThat(resultB1.isCompletedExceptionally()).isFalse();
64
65     CompletableFuture<Long> resultA2 = instanceA.send("a","message #2");
66     await("Sending of 2. message for A is completed")
67       .atMost(Duration.ofSeconds(5))
68       .until(() -> resultA2.isDone());
69     assertThat(resultA2.isCompletedExceptionally()).isTrue();
70
71     CompletableFuture<Long> resultB2 = instanceB.send("b","message #2");
72     await("Sending of 2. message for B is completed")
73       .atMost(Duration.ofSeconds(5))
74       .until(() -> resultB2.isDone());
75     assertThat(resultB2.isCompletedExceptionally()).isFalse();
76
77     assertThatThrownBy(() -> instanceA.commit()).isInstanceOf(InvalidProducerEpochException.class);
78     assertThatCode(() -> instanceB.commit()).doesNotThrowAnyException();
79   }
80 }