WIP
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:34:16 +0000 (10:34 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:51:27 +0000 (10:51 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java

index 0bcfc9e..9c80f5d 100644 (file)
@@ -36,8 +36,7 @@ public class ChatBackendProperties
   {
     private String clientIdPrefix;
     private String bootstrapServers = ":9092";
-    private String chatroomChannelTopic = "chatroom_channel";
-    private String messageChannelTopic = "message_channel";
+    private String chatRoomChannelTopic = "message_channel";
     private int numPartitions = 2;
   }
 
index dda8748..804acaa 100644 (file)
@@ -63,7 +63,7 @@ public class KafkaServicesConfiguration
       Clock clock)
   {
     return new ChatRoomChannel(
-        properties.getKafka().getMessageChannelTopic(),
+        properties.getKafka().getChatRoomChannelTopic(),
         chatRoomChannelProducer,
         chatRoomChannelConsumer,
         zoneId,
@@ -121,7 +121,7 @@ public class KafkaServicesConfiguration
         chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
-        "chat_message_channel");
+        "chatroom_channel");
     return new KafkaConsumer<>(
         properties,
         stringDeserializer,
@@ -150,8 +150,8 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "create:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
-        "message:" + EventChatMessageReceivedTo.class.getCanonicalName();
+        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }
 
   @Bean
index e56c660..e343174 100644 (file)
@@ -10,8 +10,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.util.UUID;
 
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
 
 
 @SpringBootTest(
@@ -21,14 +20,12 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
         "chat.backend.kafka.client-id-PREFIX=TEST",
         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
-        "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
-        "chat.backend.kafka.num-partitions=3" })
-@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
+        "chat.backend.kafka.message-channel-topic=" + TOPIC,
+        "chat.backend.kafka.num-partitions=10" })
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
-  final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
+  final static String TOPIC = "TEST_MESSAGE_CHANNEL";
 
   @BeforeAll
   public static void test(
@@ -46,7 +43,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 
   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     kafkaTemplate.send(record);
   }