8bf623ab939e69d836486a37021c5020a7a0af78
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.assertj.core.api.Assertions;
5 import org.junit.jupiter.api.*;
6 import org.junit.jupiter.api.extension.ExtendWith;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.test.context.EmbeddedKafka;
9 import org.springframework.test.context.junit.jupiter.SpringExtension;
10
11 import java.util.concurrent.CompletableFuture;
12
13 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
14 import static de.juplo.kafka.ApplicationTests.TOPIC;
15
16
17 @ExtendWith(SpringExtension.class)
18 @EmbeddedKafka(
19   topics = TOPIC,
20   partitions = PARTITIONS,
21   brokerProperties = {
22     "transaction.state.log.replication.factor=1",
23     "transaction.state.log.min.isr=1" })
24 @Slf4j
25 public class ApplicationTests
26 {
27         static final String TOPIC = "FOO";
28         static final int PARTITIONS = 10;
29
30   @Value("${spring.embedded.kafka.brokers}")
31   String bootstrapServers;
32
33         @Test
34         void testConcurrentProducers() throws Exception
35         {
36     SimpleProducer instanceA = new SimpleProducer(
37       bootstrapServers,
38       TOPIC,
39       "A");
40     SimpleProducer instanceB = new SimpleProducer(
41       bootstrapServers,
42       TOPIC,
43       "B");
44
45     CompletableFuture<Long> result;
46
47     instanceA.begin();
48     result = instanceA.send("a","message #1");
49     Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
50
51     instanceB.begin();
52     result = instanceB.send("b","message #1");
53     Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
54
55     result = instanceA.send("a","message #2");
56     Assertions.assertThat(result.isCompletedExceptionally()).isTrue();
57
58     result = instanceB.send("b","message #2");
59     Assertions.assertThat(result.isCompletedExceptionally()).isFalse();
60   }
61 }