package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"chat.backend.services=kafka",
- "chat.backend.kafka.client-id=TEST",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatrooms-topic=" + CHATROOMS_TOPIC,
- "chat.backend.kafka.messages-topic=" + MESSAGES_TOPIC,
+ "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
+ "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
"chat.backend.kafka.num-partitions=3" })
@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
class KafkaConfigurationIT extends AbstractConfigurationIT
{
- final static String CHATROOMS_TOPIC = "TEST_CHAT_ROOMS";
- final static String MESSAGES_TOPIC = "TEST_CHAT_MESSAGES";
+ final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
+ final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
@BeforeAll
public static void test(
{
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
int shard = shardingStrategy.selectShard(chatRoomId);
- chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
- messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ kafkaTemplate.send(record);
}
}