Clock clock)
{
return new ChatRoomChannel(
- properties.getKafka().getMessageChannelTopic(),
+ properties.getKafka().getChatRoomChannelTopic(),
chatRoomChannelProducer,
chatRoomChannelConsumer,
zoneId,
chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
- "chat_message_channel");
+ "chatroom_channel");
return new KafkaConsumer<>(
properties,
stringDeserializer,
String typeMappings ()
{
return
- "create:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
- "message:" + EventChatMessageReceivedTo.class.getCanonicalName();
+ "command_create_chatroom:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+ "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
}
@Bean
import java.util.UUID;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.CHATROOMS_TOPIC;
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.MESSAGES_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
@SpringBootTest(
"chat.backend.kafka.client-id-PREFIX=TEST",
"chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + CHATROOMS_TOPIC,
- "chat.backend.kafka.message-channel-topic=" + MESSAGES_TOPIC,
- "chat.backend.kafka.num-partitions=3" })
-@EmbeddedKafka(topics = { CHATROOMS_TOPIC, MESSAGES_TOPIC }, partitions = 3)
+ "chat.backend.kafka.message-channel-topic=" + TOPIC,
+ "chat.backend.kafka.num-partitions=10" })
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
{
- final static String CHATROOMS_TOPIC = "TEST_CHATROOM_CHANNEL";
- final static String MESSAGES_TOPIC = "TEST_MESSAGE_CHANNEL";
+ final static String TOPIC = "TEST_MESSAGE_CHANNEL";
@BeforeAll
public static void test(
static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
{
- ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+ ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
record.headers().add("__TypeId__", typeId.getBytes());
kafkaTemplate.send(record);
}