1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.implementation.kafka.*;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.junit.jupiter.api.AfterAll;
7 import org.junit.jupiter.api.BeforeAll;
8 import org.springframework.beans.factory.annotation.Autowired;
9 import org.springframework.boot.test.context.SpringBootTest;
10 import org.springframework.boot.test.context.TestConfiguration;
11 import org.springframework.boot.test.mock.mockito.MockBean;
12 import org.springframework.context.annotation.Import;
13 import org.springframework.kafka.core.KafkaTemplate;
14 import org.springframework.kafka.test.context.EmbeddedKafka;
16 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
17 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
21 webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
23 "spring.main.allow-bean-definition-overriding=true",
24 "chat.backend.services=kafka",
25 "chat.backend.kafka.client-id-PREFIX=TEST",
26 "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
27 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
28 "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
29 "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
30 "chat.backend.kafka.num-partitions=10",
33 topics = { INFO_TOPIC, DATA_TOPIC },
36 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
38 final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
39 final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
42 KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
45 public static void sendAndLoadStoredData(
46 @Autowired KafkaTemplate<String, String> messageTemplate,
47 @Autowired ChannelTaskRunner channelTaskRunner)
49 KafkaTestUtils.sendAndLoadStoredData(
57 static void joinConsumerTasks(
58 @Autowired Consumer dataChannelConsumer,
59 @Autowired Consumer infoChannelConsumer,
60 @Autowired ChannelTaskRunner channelTaskRunner)
61 throws InterruptedException
63 dataChannelConsumer.wakeup();
64 infoChannelConsumer.wakeup();
65 channelTaskRunner.joinChannels();
70 @Import(KafkaTestUtils.KafkaTestConfiguration.class)
71 static class KafkaConfigurationITConfiguration