1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
4 import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
5 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
6 import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.junit.jupiter.api.BeforeAll;
11 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.boot.test.context.SpringBootTest;
13 import org.springframework.boot.test.context.TestConfiguration;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.context.annotation.Import;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
20 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
21 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
25 webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
27 "spring.main.allow-bean-definition-overriding=true",
28 "chat.backend.services=kafka",
29 "chat.backend.kafka.client-id-PREFIX=TEST",
30 "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
31 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
32 "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
33 "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
34 "chat.backend.kafka.num-partitions=10",
37 topics = { INFO_TOPIC, DATA_TOPIC },
40 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
42 final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
43 final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
46 static void sendAndLoadStoredData(
47 @Autowired KafkaTemplate<String, String> messageTemplate,
48 @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
50 KafkaTestUtils.sendAndLoadStoredData(
55 // The initialization of the data-channel must happen,
56 // after the messages were sent into the topic of the
58 // Otherwise, the initial loading of the data might be
59 // completed, before these messages arrive, so that
60 // they are ignored and the state is never restored.
61 dataChannelTaskExecutor.executeChannelTask();
66 @Import(KafkaTestUtils.KafkaTestConfiguration.class)
67 static class KafkaConfigurationITConfiguration
70 * The definition of this bean has to be overruled, so
71 * that the configuration of the `initMethod`, which
72 * has to be called explicitly, _after_ the messages
73 * were sent to and received by the test-culster, can
76 @Bean(destroyMethod = "join")
77 ChannelTaskExecutor dataChannelTaskExecutor(
78 ThreadPoolTaskExecutor taskExecutor,
79 DataChannel dataChannel,
80 Consumer<String, AbstractMessageTo> dataChannelConsumer,
81 WorkAssignor dataChannelWorkAssignor)
83 return new ChannelTaskExecutor(
87 dataChannelWorkAssignor);