X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2FKafkaConfigurationIT.java;h=4e9ad2399191013b6adb9b2502ed22e1915a5038;hb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;hp=822f5303a8fc1c5f13f00070bcd0dee9a91b6cab;hpb=4facd23b68de068bd23457627ce0d1a85af60c02;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index 822f5303..4e9ad239 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -17,17 +18,17 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC; webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "chat.backend.services=kafka", - "chat.backend.kafka.client-id=TEST", + "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatrooms-topic=" + CHATROOMS_TOPIC, - "chat.backend.kafka.messages-topic=" + MESSAGES_TOPIC, + "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC, + "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC, "chat.backend.kafka.num-partitions=3" }) @EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3) class KafkaConfigurationIT extends AbstractConfigurationIT { - final static String CHATROOMS_TOPIC = "TEST_CHAT_ROOMS"; - final static String MESSAGES_TOPIC = "TEST_CHAT_MESSAGES"; + final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL"; + final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL"; @BeforeAll public static void test( @@ -37,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT { UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); int shard = shardingStrategy.selectShard(chatRoomId); - chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }"); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message"); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(MESSAGES_TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + kafkaTemplate.send(record); } }