+ final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
+ final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
+
+ @BeforeAll
+ public static void test(
+ @Autowired ShardingStrategy shardingStrategy,
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired KafkaTemplate<Integer, String> chatRoomTemplate)
+ {
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ kafkaTemplate.send(record);
+ }