NG
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
1 package de.juplo.kafka.chat.backend;
2
3 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.junit.jupiter.api.BeforeAll;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.boot.test.context.SpringBootTest;
8 import org.springframework.kafka.core.KafkaTemplate;
9 import org.springframework.kafka.test.context.EmbeddedKafka;
10
11 import java.util.UUID;
12
13 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
14 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
15
16
17 @SpringBootTest(
18     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
19     properties = {
20         "chat.backend.services=kafka",
21         "chat.backend.kafka.client-id-PREFIX=TEST",
22         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
23         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
24         "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
25         "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
26         "chat.backend.kafka.num-partitions=3" })
27 @EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
28 class KafkaConfigurationIT extends AbstractConfigurationIT
29 {
30   final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
31   final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
32
33   @BeforeAll
34   public static void test(
35       @Autowired ShardingStrategy shardingStrategy,
36       @Autowired KafkaTemplate<String, String> messageTemplate,
37       @Autowired KafkaTemplate<Integer, String> chatRoomTemplate)
38   {
39     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
40     int shard = shardingStrategy.selectShard(chatRoomId);
41     send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
42     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
43     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
44     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
45     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
46   }
47
48   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
49   {
50     ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
51     record.headers().add("__TypeId__", typeId.getBytes());
52     kafkaTemplate.send(record);
53   }
54 }