NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 14:57:31 +0000 (16:57 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 14:57:31 +0000 (16:57 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 73fa719..cb5684c 100644 (file)
@@ -34,7 +34,7 @@ public class ChatBackendProperties
   @Setter
   public static class KafkaServicesProperties
   {
-    private String clientId;
+    private String clientIdPrefix;
     private String bootstrapServers = ":9092";
     private String topic = "test";
     private int numPartitions = 2;
index 63f7dbf..b0e7776 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -71,11 +70,17 @@ public class KafkaServicesConfiguration
   @Bean
   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
       JsonSerializer<ChatRoomTo> chatRoomSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         integerSerializer,
         chatRoomSerializer);
   }
@@ -99,11 +104,15 @@ public class KafkaServicesConfiguration
   @Bean
   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_room_channel");
@@ -156,11 +165,17 @@ public class KafkaServicesConfiguration
   @Bean
   Producer<String, MessageTo>  chatMessageChannelProducer(
       Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
       JsonSerializer<MessageTo> messageSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         stringSerializer,
         messageSerializer);
   }
@@ -184,11 +199,15 @@ public class KafkaServicesConfiguration
   @Bean
   Consumer<String, MessageTo>  chatMessageChannelConsumer(
       Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
       JsonDeserializer<MessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_message_channel");
@@ -221,9 +240,6 @@ public class KafkaServicesConfiguration
   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
   {
     Properties properties = new Properties();
-    properties.setProperty(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId());
     properties.setProperty(
         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
         chatBackendProperties.getKafka().getBootstrapServers());
@@ -239,7 +255,7 @@ public class KafkaServicesConfiguration
         chatBackendProperties.getKafka().getBootstrapServers());
     properties.setProperty(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId());
+        chatBackendProperties.getKafka().getClientIdPrefix());
     properties.setProperty(
         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         "false");