NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index b0e7776..4350779 100644 (file)
@@ -58,7 +58,7 @@ public class KafkaServicesConfiguration
       Clock clock)
   {
     return new ChatRoomChannel(
-        properties.getKafka().getTopic(),
+        properties.getKafka().getChatroomChannelTopic(),
         chatRoomChannelProducer,
         chatRoomChannelConsumer,
         shardingStrategy,
@@ -112,7 +112,7 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_room_channel");
@@ -155,7 +155,7 @@ public class KafkaServicesConfiguration
       ZoneId zoneId)
   {
     return new ChatMessageChannel(
-        properties.getKafka().getTopic(),
+        properties.getKafka().getMessageChannelTopic(),
         chatMessageChannelProducer,
         chatMessageChannelConsumer,
         zoneId,
@@ -253,9 +253,6 @@ public class KafkaServicesConfiguration
     properties.setProperty(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         chatBackendProperties.getKafka().getBootstrapServers());
-    properties.setProperty(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix());
     properties.setProperty(
         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         "false");