1 package de.juplo.kafka.chat.backend;
3 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.junit.jupiter.api.BeforeAll;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.boot.test.context.SpringBootTest;
8 import org.springframework.kafka.core.KafkaTemplate;
9 import org.springframework.kafka.test.context.EmbeddedKafka;
11 import java.util.UUID;
13 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
14 import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
18 webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
20 "chat.backend.services=kafka",
21 "chat.backend.kafka.client-id-PREFIX=TEST",
22 "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
23 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
24 "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
25 "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
26 "chat.backend.kafka.num-partitions=3" })
27 @EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
28 class KafkaConfigurationIT extends AbstractConfigurationIT
30 final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
31 final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
34 public static void test(
35 @Autowired ShardingStrategy shardingStrategy,
36 @Autowired KafkaTemplate<String, String> messageTemplate,
37 @Autowired KafkaTemplate<Integer, String> chatRoomTemplate)
39 UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
40 int shard = shardingStrategy.selectShard(chatRoomId);
41 send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
42 send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
43 send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
44 send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
45 send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
48 static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
50 ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
51 record.headers().add("__TypeId__", typeId.getBytes());
52 kafkaTemplate.send(record);