package de.juplo.kafka.chat.backend;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import java.util.UUID;
+
import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
"chat.backend.services=kafka",
"chat.backend.kafka.client-id=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"chat.backend.kafka.topic=" + TOPIC,
"chat.backend.kafka.num-partitions=3" })
@EmbeddedKafka(topics = TOPIC, partitions = 3)
class KafkaConfigurationIT extends AbstractConfigurationIT
{
final static String TOPIC = "TEST";
+
+ @BeforeAll
+ public static void test(
+ @Autowired ShardingStrategy shardingStrategy,
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired KafkaTemplate<Integer, String> chatRoomTemplate)
+ {
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ chatRoomTemplate.send(TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
+ messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
+ messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
+ messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
+ messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+ }
}