NG
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / KafkaConfigurationIT.java
index e300c0d..4e9ad23 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -9,22 +10,25 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.util.UUID;
 
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
 
 
 @SpringBootTest(
     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
         "chat.backend.services=kafka",
-        "chat.backend.kafka.client-id=TEST",
+        "chat.backend.kafka.client-id-PREFIX=TEST",
         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.topic=" + TOPIC,
+        "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
+        "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
         "chat.backend.kafka.num-partitions=3" })
-@EmbeddedKafka(topics = TOPIC, partitions = 3)
+@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
 class KafkaConfigurationIT extends AbstractConfigurationIT
 {
-  final static String TOPIC = "TEST";
+  final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
+  final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
 
   @BeforeAll
   public static void test(
@@ -34,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT
   {
     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
     int shard = shardingStrategy.selectShard(chatRoomId);
-    chatRoomTemplate.send(TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
-    messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
-    messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
-    messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
-    messageTemplate.send(TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    kafkaTemplate.send(record);
   }
 }