From 1f5f5e1cb99a32fac026d071a9e5279978c0bc83 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Aug 2023 10:34:16 +0200 Subject: [PATCH] WIP --- .../kafka/chat/backend/ChatBackendProperties.java | 3 +-- .../kafka/KafkaServicesConfiguration.java | 8 ++++---- .../kafka/chat/backend/KafkaConfigurationIT.java | 15 ++++++--------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 0bcfc9e5..9c80f5d6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -36,8 +36,7 @@ public class ChatBackendProperties { private String clientIdPrefix; private String bootstrapServers = ":9092"; - private String chatroomChannelTopic = "chatroom_channel"; - private String messageChannelTopic = "message_channel"; + private String chatRoomChannelTopic = "message_channel"; private int numPartitions = 2; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index dda8748e..804acaa6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -63,7 +63,7 @@ public class KafkaServicesConfiguration Clock clock) { return new ChatRoomChannel( - properties.getKafka().getMessageChannelTopic(), + properties.getKafka().getChatRoomChannelTopic(), chatRoomChannelProducer, chatRoomChannelConsumer, zoneId, @@ -121,7 +121,7 @@ public class KafkaServicesConfiguration chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, - "chat_message_channel"); + "chatroom_channel"); return new KafkaConsumer<>( properties, stringDeserializer, @@ -150,8 +150,8 @@ public class KafkaServicesConfiguration String typeMappings () { return - "create:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + - "message:" + EventChatMessageReceivedTo.class.getCanonicalName(); + "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName(); } @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index e56c660d..e343174d 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -10,8 +10,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import java.util.UUID; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC; -import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC; +import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC; @SpringBootTest( @@ -21,14 +20,12 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC; "chat.backend.kafka.client-id-PREFIX=TEST", "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC, - "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC, - "chat.backend.kafka.num-partitions=3" }) -@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3) + "chat.backend.kafka.message-channel-topic=" + TOPIC, + "chat.backend.kafka.num-partitions=10" }) +@EmbeddedKafka(topics = { TOPIC }, partitions = 10) class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT { - final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL"; - final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL"; + final static String TOPIC = "TEST_MESSAGE_CHANNEL"; @BeforeAll public static void test( @@ -46,7 +43,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) { - ProducerRecord record = new ProducerRecord<>(MESSAGES_TOPIC, key, value); + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); record.headers().add("__TypeId__", typeId.getBytes()); kafkaTemplate.send(record); } -- 2.20.1