test: Fixed `AbstractConfigurationIT#testPutMessageInNewChatRoom()`
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
4 import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
5 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
6 import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.junit.jupiter.api.BeforeAll;
11 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.boot.test.context.SpringBootTest;
13 import org.springframework.boot.test.context.TestConfiguration;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.context.annotation.Import;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
19
20 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
21 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
22
23
24 @SpringBootTest(
25     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
26     properties = {
27         "spring.main.allow-bean-definition-overriding=true",
28         "chat.backend.services=kafka",
29         "chat.backend.kafka.client-id-PREFIX=TEST",
30         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
31         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
32         "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
33         "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
34         "chat.backend.kafka.num-partitions=10",
35         })
36 @EmbeddedKafka(
37     topics = { INFO_TOPIC, DATA_TOPIC },
38     partitions = 10)
39 @Slf4j
40 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
41 {
42   final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
43   final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
44
45   @BeforeAll
46   static void sendAndLoadStoredData(
47       @Autowired KafkaTemplate<String, String> messageTemplate,
48       @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
49   {
50     KafkaTestUtils.sendAndLoadStoredData(
51         messageTemplate,
52         INFO_TOPIC,
53         DATA_TOPIC);
54
55     // The initialization of the data-channel must happen,
56     // after the messages were sent into the topic of the
57     // test-cluster.
58     // Otherwise, the initial loading of the data might be
59     // completed, before these messages arrive, so that
60     // they are ignored and the state is never restored.
61     dataChannelTaskExecutor.executeChannelTask();
62   }
63
64
65   @TestConfiguration
66   @Import(KafkaTestUtils.KafkaTestConfiguration.class)
67   static class KafkaConfigurationITConfiguration
68   {
69     /**
70      * The definition of this bean has to be overruled, so
71      * that the configuration of the `initMethod`, which
72      * has to be called explicitly, _after_ the messages
73      * were sent to and received by the test-culster, can
74      * be dropped.
75      */
76     @Bean(destroyMethod = "join")
77     ChannelTaskExecutor dataChannelTaskExecutor(
78         ThreadPoolTaskExecutor taskExecutor,
79         DataChannel dataChannel,
80         Consumer<String, AbstractMessageTo> dataChannelConsumer,
81         WorkAssignor dataChannelWorkAssignor)
82     {
83       return new ChannelTaskExecutor(
84           taskExecutor,
85           dataChannel,
86           dataChannelConsumer,
87           dataChannelWorkAssignor);
88     }
89   }
90 }