projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
4facd23
)
NEU
author
Kai Moritz
<kai@juplo.de>
Thu, 20 Apr 2023 14:57:31 +0000
(16:57 +0200)
committer
Kai Moritz
<kai@juplo.de>
Thu, 20 Apr 2023 14:57:31 +0000
(16:57 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
index
73fa719
..
cb5684c
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
@@
-34,7
+34,7
@@
public class ChatBackendProperties
@Setter
public static class KafkaServicesProperties
{
@Setter
public static class KafkaServicesProperties
{
- private String clientId;
+ private String clientId
Prefix
;
private String bootstrapServers = ":9092";
private String topic = "test";
private int numPartitions = 2;
private String bootstrapServers = ":9092";
private String topic = "test";
private int numPartitions = 2;
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
index
63f7dbf
..
b0e7776
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
@@
-2,7
+2,6
@@
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import org.apache.kafka.clients.consumer.Consumer;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import org.apache.kafka.clients.consumer.Consumer;
@@
-71,11
+70,17
@@
public class KafkaServicesConfiguration
@Bean
Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
@Bean
Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
IntegerSerializer integerSerializer,
JsonSerializer<ChatRoomTo> chatRoomSerializer)
{
IntegerSerializer integerSerializer,
JsonSerializer<ChatRoomTo> chatRoomSerializer)
{
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
return new KafkaProducer<>(
return new KafkaProducer<>(
-
defaultProducerP
roperties,
+
p
roperties,
integerSerializer,
chatRoomSerializer);
}
integerSerializer,
chatRoomSerializer);
}
@@
-99,11
+104,15
@@
public class KafkaServicesConfiguration
@Bean
Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
@Bean
Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
IntegerDeserializer integerDeserializer,
JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
IntegerDeserializer integerDeserializer,
JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
"chat_room_channel");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
"chat_room_channel");
@@
-156,11
+165,17
@@
public class KafkaServicesConfiguration
@Bean
Producer<String, MessageTo> chatMessageChannelProducer(
Properties defaultProducerProperties,
@Bean
Producer<String, MessageTo> chatMessageChannelProducer(
Properties defaultProducerProperties,
+ ChatBackendProperties chatBackendProperties,
StringSerializer stringSerializer,
JsonSerializer<MessageTo> messageSerializer)
{
StringSerializer stringSerializer,
JsonSerializer<MessageTo> messageSerializer)
{
+ Map<String, Object> properties = new HashMap<>();
+ defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
return new KafkaProducer<>(
return new KafkaProducer<>(
-
defaultProducerP
roperties,
+
p
roperties,
stringSerializer,
messageSerializer);
}
stringSerializer,
messageSerializer);
}
@@
-184,11
+199,15
@@
public class KafkaServicesConfiguration
@Bean
Consumer<String, MessageTo> chatMessageChannelConsumer(
Properties defaultConsumerProperties,
@Bean
Consumer<String, MessageTo> chatMessageChannelConsumer(
Properties defaultConsumerProperties,
+ ChatBackendProperties chatBackendProperties,
StringDeserializer stringDeserializer,
JsonDeserializer<MessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
StringDeserializer stringDeserializer,
JsonDeserializer<MessageTo> messageDeserializer)
{
Map<String, Object> properties = new HashMap<>();
defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+ properties.put(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
"chat_message_channel");
properties.put(
ConsumerConfig.GROUP_ID_CONFIG,
"chat_message_channel");
@@
-221,9
+240,6
@@
public class KafkaServicesConfiguration
Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
{
Properties properties = new Properties();
Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
{
Properties properties = new Properties();
- properties.setProperty(
- ProducerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientId());
properties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
chatBackendProperties.getKafka().getBootstrapServers());
properties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
chatBackendProperties.getKafka().getBootstrapServers());
@@
-239,7
+255,7
@@
public class KafkaServicesConfiguration
chatBackendProperties.getKafka().getBootstrapServers());
properties.setProperty(
ConsumerConfig.CLIENT_ID_CONFIG,
chatBackendProperties.getKafka().getBootstrapServers());
properties.setProperty(
ConsumerConfig.CLIENT_ID_CONFIG,
- chatBackendProperties.getKafka().getClientId());
+ chatBackendProperties.getKafka().getClientId
Prefix
());
properties.setProperty(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");
properties.setProperty(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false");