FIX
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.*;
4 import lombok.extern.slf4j.Slf4j;
5 import org.junit.jupiter.api.AfterAll;
6 import org.junit.jupiter.api.BeforeAll;
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.boot.test.context.SpringBootTest;
9 import org.springframework.boot.test.context.TestConfiguration;
10 import org.springframework.boot.test.mock.mockito.MockBean;
11 import org.springframework.context.annotation.Import;
12 import org.springframework.kafka.core.KafkaTemplate;
13 import org.springframework.kafka.test.context.EmbeddedKafka;
14
15 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
16 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
17
18
19 @SpringBootTest(
20     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
21     properties = {
22         "spring.main.allow-bean-definition-overriding=true",
23         "chat.backend.services=kafka",
24         "chat.backend.kafka.client-id-PREFIX=TEST",
25         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
26         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
27         "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
28         "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
29         "chat.backend.kafka.num-partitions=10",
30         })
31 @EmbeddedKafka(
32     topics = { INFO_TOPIC, DATA_TOPIC },
33     partitions = 10)
34 @Slf4j
35 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
36 {
37   final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
38   final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
39
40   @MockBean
41   KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
42
43   @BeforeAll
44   public static void sendAndLoadStoredData(
45       @Autowired KafkaTemplate<String, String> messageTemplate,
46       @Autowired ConsumerTaskRunner consumerTaskRunner)
47   {
48     KafkaTestUtils.sendAndLoadStoredData(
49         messageTemplate,
50         INFO_TOPIC,
51         DATA_TOPIC,
52         consumerTaskRunner);
53   }
54
55   @AfterAll
56   static void joinConsumerTasks(
57       @Autowired ConsumerTaskRunner consumerTaskRunner)
58       throws InterruptedException
59   {
60     KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
61   }
62
63
64   @TestConfiguration
65   @Import(KafkaTestUtils.KafkaTestConfiguration.class)
66   static class KafkaConfigurationITConfiguration
67   {
68   }
69 }