refactor: Separated channels for data and info -- Refactored/aligned code
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.implementation.kafka.*;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.producer.ProducerRecord;
6 import org.apache.kafka.common.TopicPartition;
7 import org.junit.jupiter.api.AfterAll;
8 import org.junit.jupiter.api.BeforeAll;
9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.boot.context.properties.EnableConfigurationProperties;
11 import org.springframework.boot.test.context.SpringBootTest;
12 import org.springframework.boot.test.context.TestConfiguration;
13 import org.springframework.boot.test.mock.mockito.MockBean;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.context.annotation.Import;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.support.SendResult;
18 import org.springframework.kafka.test.context.EmbeddedKafka;
19
20 import java.util.List;
21
22 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
23 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
24
25
26 @SpringBootTest(
27     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
28     properties = {
29         "spring.main.allow-bean-definition-overriding=true",
30         "chat.backend.services=kafka",
31         "chat.backend.kafka.client-id-PREFIX=TEST",
32         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
33         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
34         "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
35         "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
36         "chat.backend.kafka.num-partitions=10",
37         })
38 @EmbeddedKafka(
39     topics = { INFO_TOPIC, DATA_TOPIC },
40     partitions = 10)
41 @Slf4j
42 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
43 {
44   final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
45   final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
46
47   @MockBean
48   KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
49
50   @BeforeAll
51   public static void sendAndLoadStoredData(
52       @Autowired KafkaTemplate<String, String> messageTemplate,
53       @Autowired ConsumerTaskRunner consumerTaskRunner)
54   {
55     send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
56     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
57     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
58     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
59     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
60
61     consumerTaskRunner.executeConsumerTasks();
62   }
63
64   static void send(
65       KafkaTemplate<String, String> kafkaTemplate,
66       String topic,
67       String key,
68       String value,
69       String typeId)
70   {
71     ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
72     record.headers().add("__TypeId__", typeId.getBytes());
73     SendResult<String, String> result = kafkaTemplate.send(record).join();
74     log.info(
75         "Sent {}={} to {}",
76         key,
77         value,
78         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
79   }
80   @AfterAll
81   static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
82   {
83     consumerTaskRunner.joinConsumerTasks();
84   }
85
86
87   @TestConfiguration
88   @EnableConfigurationProperties(ChatBackendProperties.class)
89   @Import(KafkaServicesConfiguration.class)
90   static class KafkaConfigurationITConfiguration
91   {
92     @Bean
93     ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
94         DataChannel dataChannel)
95     {
96       return consumer ->
97       {
98         List<TopicPartition> assignedPartitions =
99             List.of(new TopicPartition(DATA_TOPIC, 2));
100         consumer.assign(assignedPartitions);
101         dataChannel.onPartitionsAssigned(assignedPartitions);
102       };
103     }
104   }
105 }