WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.*;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.junit.jupiter.api.AfterAll;
7 import org.junit.jupiter.api.BeforeAll;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.boot.test.context.SpringBootTest;
10 import org.springframework.boot.test.context.TestConfiguration;
11 import org.springframework.boot.test.mock.mockito.MockBean;
12 import org.springframework.context.annotation.Import;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.test.context.EmbeddedKafka;
15
16 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
17 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
18
19
20 @SpringBootTest(
21     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
22     properties = {
23         "spring.main.allow-bean-definition-overriding=true",
24         "chat.backend.services=kafka",
25         "chat.backend.kafka.client-id-PREFIX=TEST",
26         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
27         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
28         "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
29         "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
30         "chat.backend.kafka.num-partitions=10",
31         })
32 @EmbeddedKafka(
33     topics = { INFO_TOPIC, DATA_TOPIC },
34     partitions = 10)
35 @Slf4j
36 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
37 {
38   final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
39   final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
40
41   @MockBean
42   KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
43
44   @BeforeAll
45   public static void sendAndLoadStoredData(
46       @Autowired KafkaTemplate<String, String> messageTemplate,
47       @Autowired ChannelTaskRunner channelTaskRunner)
48   {
49     KafkaTestUtils.sendAndLoadStoredData(
50         messageTemplate,
51         INFO_TOPIC,
52         DATA_TOPIC,
53         channelTaskRunner);
54   }
55
56   @AfterAll
57   static void joinConsumerTasks(
58       @Autowired Consumer dataChannelConsumer,
59       @Autowired Consumer infoChannelConsumer,
60       @Autowired ChannelTaskRunner channelTaskRunner)
61       throws InterruptedException
62   {
63     dataChannelConsumer.wakeup();
64     infoChannelConsumer.wakeup();
65     channelTaskRunner.joinChannels();
66   }
67
68
69   @TestConfiguration
70   @Import(KafkaTestUtils.KafkaTestConfiguration.class)
71   static class KafkaConfigurationITConfiguration
72   {
73   }
74 }