From 6f8bd42a20085f186a27d2aeac8aedaf05a7e746 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 16:57:31 +0200 Subject: [PATCH] NEU --- .../chat/backend/ChatBackendProperties.java | 2 +- .../kafka/KafkaServicesConfiguration.java | 30 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 73fa719e..cb5684c2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -34,7 +34,7 @@ public class ChatBackendProperties @Setter public static class KafkaServicesProperties { - private String clientId; + private String clientIdPrefix; private String bootstrapServers = ":9092"; private String topic = "test"; private int numPartitions = 2; diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 63f7dbf4..b0e77761 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import org.apache.kafka.clients.consumer.Consumer; @@ -71,11 +70,17 @@ public class KafkaServicesConfiguration @Bean Producer chatRoomChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, integerSerializer, chatRoomSerializer); } @@ -99,11 +104,15 @@ public class KafkaServicesConfiguration @Bean Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); @@ -156,11 +165,17 @@ public class KafkaServicesConfiguration @Bean Producer chatMessageChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, stringSerializer, messageSerializer); } @@ -184,11 +199,15 @@ public class KafkaServicesConfiguration @Bean Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -221,9 +240,6 @@ public class KafkaServicesConfiguration Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); properties.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); @@ -239,7 +255,7 @@ public class KafkaServicesConfiguration chatBackendProperties.getKafka().getBootstrapServers()); properties.setProperty( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); + chatBackendProperties.getKafka().getClientIdPrefix()); properties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); -- 2.20.1