From: Kai Moritz <kai@juplo.de>
Date: Fri, 15 Sep 2023 11:08:56 +0000 (+0200)
Subject: WIP:ALIGN
X-Git-Tag: rebase--2023-09-15--16-33~5
X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5d3cdebd2669993976614e8221920be736f7a2fa;p=demos%2Fkafka%2Fchat

WIP:ALIGN
---

diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
index 865dda76..56695df6 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
@@ -52,54 +52,14 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
-    consumerTaskRunner.run();
+    KafkaTestUtils.sendAndLoadStoredData(
+        messageTemplate,
+        consumerTaskRunner);
   }
 
-  static void send(
-      KafkaTemplate<String, String> kafkaTemplate,
-      String topic,
-      String key,
-      String value,
-      String typeId)
-  {
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
-    record.headers().add("__TypeId__", typeId.getBytes());
-    SendResult<String, String> result = kafkaTemplate.send(record).join();
-    log.info(
-        "Sent {}={} to {}",
-        key,
-        value,
-        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
-  }
   @AfterAll
   static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    consumerTaskRunner.joinConsumerTasks();
-  }
-
-
-  @TestConfiguration
-  @EnableConfigurationProperties(ChatBackendProperties.class)
-  @Import(KafkaServicesConfiguration.class)
-  static class KafkaConfigurationITConfiguration
-  {
-    @Bean
-    WorkAssignor dataChannelWorkAssignor(
-        DataChannel dataChannel)
-    {
-      return consumer ->
-      {
-        List<TopicPartition> assignedPartitions =
-            List.of(new TopicPartition(DATA_TOPIC, 2));
-        consumer.assign(assignedPartitions);
-        dataChannel.onPartitionsAssigned(assignedPartitions);
-      };
-    }
+    KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
index 6d52c259..33d26003 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
@@ -44,12 +44,12 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @BeforeAll
   static void sendAndLoadStoredData(
-      @Autowired ConsumerTaskRunner consumerTaskRunner,
-      @Autowired KafkaTemplate<String, String> messageTemplate)
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
   {
     KafkaTestUtils.sendAndLoadStoredData(
-        consumerTaskRunner,
-        messageTemplate);
+        messageTemplate,
+        consumerTaskRunner);
   }
 
   @AfterAll
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
index a00f2c1f..76196e37 100644
--- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
+++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
@@ -24,7 +24,7 @@ public class KafkaTestUtils
   @TestConfiguration
   @EnableConfigurationProperties(ChatBackendProperties.class)
   @Import(KafkaServicesConfiguration.class)
-  static class KafkaTestConfiguration
+  public  static class KafkaTestConfiguration
   {
     @Bean
     WorkAssignor dataChannelWorkAssignor(DataChannel dataChannel)
@@ -46,9 +46,9 @@ public class KafkaTestUtils
   }
 
 
-  static void sendAndLoadStoredData(
-      ConsumerTaskRunner consumerTaskRunner,
-      KafkaTemplate<String, String> messageTemplate)
+  public static void sendAndLoadStoredData(
+      KafkaTemplate<String, String> messageTemplate,
+      ConsumerTaskRunner consumerTaskRunner)
   {
     send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
     send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
@@ -76,7 +76,7 @@ public class KafkaTestUtils
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
 
-  static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
   {
     consumerTaskRunner.joinConsumerTasks();
   }