WIP:KafkaChatHomeTest
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeTest.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.ChatBackendProperties;
4 import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.TopicPartition;
11 import org.junit.jupiter.api.AfterAll;
12 import org.junit.jupiter.api.BeforeAll;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.TestConfiguration;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.kafka.core.KafkaTemplate;
17 import org.springframework.kafka.support.SendResult;
18 import org.springframework.kafka.test.context.EmbeddedKafka;
19 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
20
21 import java.time.Clock;
22 import java.time.ZoneId;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25
26 import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
27
28
29 @EmbeddedKafka(topics = { TOPIC }, partitions = 10)
30 @Slf4j
31 public class KafkaChatHomeTest extends ChatHomeWithShardsTestBase
32 {
33   final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
34
35   static CompletableFuture<Void> CONSUMER_JOB;
36
37
38   @TestConfiguration
39   static class Configuration
40   {
41     @Bean
42     KafkaChatHome chatHome(ChatRoomChannel chatRoomChannel)
43     {
44       return new KafkaChatHome(numShards(), chatRoomChannel);
45     }
46
47     @Bean
48     ChatRoomChannel chatRoomChannel(
49         Producer<String, AbstractMessageTo> chatRoomChannelProducer,
50         Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
51         ZoneId zoneId,
52         Clock clock)
53     {
54       return new ChatRoomChannel(
55           TOPIC,
56           chatRoomChannelProducer,
57           chatRoomChannelConsumer,
58           zoneId,
59           numShards(),
60           8,
61           clock);
62     }
63
64     Integer numShards()
65     {
66       return 10;
67     }
68   }
69   @BeforeAll
70   public static void sendAndLoadStoredData(
71       @Autowired KafkaTemplate<String, String> messageTemplate,
72       @Autowired Consumer chatRoomChannelConsumer,
73       @Autowired ThreadPoolTaskExecutor taskExecutor,
74       @Autowired ChatRoomChannel chatRoomChannel)
75   {
76     send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
77     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
78     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
79     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
80     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
81
82     List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
83     chatRoomChannelConsumer.assign(assignedPartitions);
84     chatRoomChannel.onPartitionsAssigned(assignedPartitions);
85     CONSUMER_JOB = taskExecutor
86         .submitCompletable(chatRoomChannel)
87         .exceptionally(e ->
88         {
89           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
90           return null;
91         });
92   }
93
94   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
95   {
96     ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
97     record.headers().add("__TypeId__", typeId.getBytes());
98     SendResult<String, String> result = kafkaTemplate.send(record).join();
99     log.info(
100         "Sent {}={} to {}",
101         key,
102         value,
103         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
104   }
105
106   @AfterAll
107   static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
108   {
109     log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
110     chatRoomChannelConsumer.wakeup();
111     log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
112     CONSUMER_JOB.join();
113     log.info("Joined the consumer of the ChatRoomChannel");
114   }
115 }