WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Fri, 15 Sep 2023 11:08:56 +0000 (13:08 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Sep 2023 11:17:53 +0000 (13:17 +0200)
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

index 865dda7..56695df 100644 (file)
@@ -52,54 +52,14 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
-    consumerTaskRunner.run();
+    KafkaTestUtils.sendAndLoadStoredData(
+        messageTemplate,
+        consumerTaskRunner);
   }
 
-  static void send(
-      KafkaTemplate<String, String> kafkaTemplate,
-      String topic,
-      String key,
-      String value,
-      String typeId)
-  {
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
-    record.headers().add("__TypeId__", typeId.getBytes());
-    SendResult<String, String> result = kafkaTemplate.send(record).join();
-    log.info(
-        "Sent {}={} to {}",
-        key,
-        value,
-        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
-  }
   @AfterAll
   static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    consumerTaskRunner.joinConsumerTasks();
-  }
-
-
-  @TestConfiguration
-  @EnableConfigurationProperties(ChatBackendProperties.class)
-  @Import(KafkaServicesConfiguration.class)
-  static class KafkaConfigurationITConfiguration
-  {
-    @Bean
-    WorkAssignor dataChannelWorkAssignor(
-        DataChannel dataChannel)
-    {
-      return consumer ->
-      {
-        List<TopicPartition> assignedPartitions =
-            List.of(new TopicPartition(DATA_TOPIC, 2));
-        consumer.assign(assignedPartitions);
-        dataChannel.onPartitionsAssigned(assignedPartitions);
-      };
-    }
+    KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
 }
index 6d52c25..33d2600 100644 (file)
@@ -44,12 +44,12 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @BeforeAll
   static void sendAndLoadStoredData(
-      @Autowired ConsumerTaskRunner consumerTaskRunner,
-      @Autowired KafkaTemplate<String, String> messageTemplate)
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
   {
     KafkaTestUtils.sendAndLoadStoredData(
-        consumerTaskRunner,
-        messageTemplate);
+        messageTemplate,
+        consumerTaskRunner);
   }
 
   @AfterAll
index a00f2c1..76196e3 100644 (file)
@@ -24,7 +24,7 @@ public class KafkaTestUtils
   @TestConfiguration
   @EnableConfigurationProperties(ChatBackendProperties.class)
   @Import(KafkaServicesConfiguration.class)
-  static class KafkaTestConfiguration
+  public  static class KafkaTestConfiguration
   {
     @Bean
     WorkAssignor dataChannelWorkAssignor(DataChannel dataChannel)
@@ -46,9 +46,9 @@ public class KafkaTestUtils
   }
 
 
-  static void sendAndLoadStoredData(
-      ConsumerTaskRunner consumerTaskRunner,
-      KafkaTemplate<String, String> messageTemplate)
+  public static void sendAndLoadStoredData(
+      KafkaTemplate<String, String> messageTemplate,
+      ConsumerTaskRunner consumerTaskRunner)
   {
     send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
@@ -76,7 +76,7 @@ public class KafkaTestUtils
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
 
-  static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
   {
     consumerTaskRunner.joinConsumerTasks();
   }