WIP
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 09:25:03 +0000 (11:25 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 09:25:03 +0000 (11:25 +0200)
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java

index 6b9669f..694b67f 100644 (file)
@@ -13,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
@@ -44,13 +45,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @MockBean
   KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
 
-
   @BeforeAll
-  public static void prepareChatRoomChannelConsumer(
+  public static void sendAndLoadStoredData(,
+      @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired Consumer chatRoomChannelConsumer,
       @Autowired ThreadPoolTaskExecutor taskExecutor,
-  @Autowired ChatRoomChannel chatRoomChannel)
+      @Autowired ChatRoomChannel chatRoomChannel)
   {
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
     List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
     chatRoomChannelConsumer.assign(assignedPartitions);
     chatRoomChannel.onPartitionsAssigned(assignedPartitions);
@@ -63,27 +70,16 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
         });
   }
 
-  @BeforeAll
-  public static void sendStoredData(
-      @Autowired ShardingStrategy shardingStrategy,
-      @Autowired KafkaTemplate<String, String> messageTemplate)
-  {
-    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-    int shard = shardingStrategy.selectShard(chatRoomId);
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-  }
-
   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
   {
     ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
-    kafkaTemplate
-        .send(record)
-        .thenAccept(result -> log.info("Sent {}={} to partition {}", key, value, result.getRecordMetadata().partition()));
+    SendResult<String, String> result = kafkaTemplate.send(record).join();
+    log.info(
+        "Sent {}={} to {}",
+        key,
+        value,
+        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())));
   }
 
   @AfterAll