From: Kai Moritz Date: Thu, 20 Apr 2023 15:36:11 +0000 (+0200) Subject: NEU X-Git-Tag: rebase--2023-08-18~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e400c437b999e1e92f2a52e3ef72306a9107aff9;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 43ea3994..1925cc88 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -154,7 +154,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic)); + consumer.subscribe(List.of(topic), this); running = true; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java new file mode 100644 index 00000000..43507792 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -0,0 +1,270 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Clock; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +@ConditionalOnProperty( + prefix = "chat.backend", + name = "services", + havingValue = "kafka") +@Configuration +public class KafkaServicesConfiguration +{ + @Bean + ChatHome kafkaChatHome( + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) + { + return new KafkaChatHome(shardingStrategy, chatMessageChannel); + } + + @Bean + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) + { + return new KafkaChatRoomFactory(chatRoomChannel); + } + + @Bean + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel, + Clock clock) + { + return new ChatRoomChannel( + properties.getKafka().getChatroomChannelTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + shardingStrategy, + chatMessageChannel, + clock, + properties.getChatroomBufferSize()); + } + + @Bean + Producer chatRoomChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + IntegerSerializer integerSerializer, + JsonSerializer chatRoomSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + integerSerializer, + chatRoomSerializer); + } + + @Bean + IntegerSerializer integerSerializer() + { + return new IntegerSerializer(); + } + + @Bean + JsonSerializer chatRoomSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + IntegerDeserializer integerDeserializer, + JsonDeserializer chatRoomDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_room_channel"); + return new KafkaConsumer<>( + properties, + integerDeserializer, + chatRoomDeserializer); + } + + @Bean + IntegerDeserializer integerDeserializer() + { + return new IntegerDeserializer(); + } + + @Bean + JsonDeserializer chatRoomDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId) + { + return new ChatMessageChannel( + properties.getKafka().getMessageChannelTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, + properties.getKafka().getNumPartitions()); + } + + @Bean + Producer chatMessageChannelProducer( + Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + properties.put( + ConsumerConfig.GROUP_ID_CONFIG, + "chat_message_channel"); + return new KafkaConsumer<>( + properties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); + return deserializer; + } + + @Bean + Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + return properties; + } + + @Bean + Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + properties.setProperty( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + chatBackendProperties.getKafka().getBootstrapServers()); + properties.setProperty( + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, + "false"); + properties.setProperty( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"); + return properties; + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java deleted file mode 100644 index 43507792..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ /dev/null @@ -1,270 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.time.Clock; -import java.time.ZoneId; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - - -@ConditionalOnProperty( - prefix = "chat.backend", - name = "services", - havingValue = "kafka") -@Configuration -public class KafkaServicesConfiguration -{ - @Bean - ChatHome kafkaChatHome( - ShardingStrategy shardingStrategy, - ChatMessageChannel chatMessageChannel) - { - return new KafkaChatHome(shardingStrategy, chatMessageChannel); - } - - @Bean - KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) - { - return new KafkaChatRoomFactory(chatRoomChannel); - } - - @Bean - ChatRoomChannel chatRoomChannel( - ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, - ShardingStrategy shardingStrategy, - ChatMessageChannel chatMessageChannel, - Clock clock) - { - return new ChatRoomChannel( - properties.getKafka().getChatroomChannelTopic(), - chatRoomChannelProducer, - chatRoomChannelConsumer, - shardingStrategy, - chatMessageChannel, - clock, - properties.getChatroomBufferSize()); - } - - @Bean - Producer chatRoomChannelProducer( - Properties defaultProducerProperties, - ChatBackendProperties chatBackendProperties, - IntegerSerializer integerSerializer, - JsonSerializer chatRoomSerializer) - { - Map properties = new HashMap<>(); - defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); - return new KafkaProducer<>( - properties, - integerSerializer, - chatRoomSerializer); - } - - @Bean - IntegerSerializer integerSerializer() - { - return new IntegerSerializer(); - } - - @Bean - JsonSerializer chatRoomSerializer() - { - JsonSerializer serializer = new JsonSerializer<>(); - serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), - false); - return serializer; - } - - @Bean - Consumer chatRoomChannelConsumer( - Properties defaultConsumerProperties, - ChatBackendProperties chatBackendProperties, - IntegerDeserializer integerDeserializer, - JsonDeserializer chatRoomDeserializer) - { - Map properties = new HashMap<>(); - defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); - properties.put( - ConsumerConfig.GROUP_ID_CONFIG, - "chat_room_channel"); - return new KafkaConsumer<>( - properties, - integerDeserializer, - chatRoomDeserializer); - } - - @Bean - IntegerDeserializer integerDeserializer() - { - return new IntegerDeserializer(); - } - - @Bean - JsonDeserializer chatRoomDeserializer() - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of( - JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, - JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), - false ); - return deserializer; - } - - @Bean - ShardingStrategy shardingStrategy(ChatBackendProperties properties) - { - return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); - } - - @Bean - ChatMessageChannel chatMessageChannel( - ChatBackendProperties properties, - Producer chatMessageChannelProducer, - Consumer chatMessageChannelConsumer, - ZoneId zoneId) - { - return new ChatMessageChannel( - properties.getKafka().getMessageChannelTopic(), - chatMessageChannelProducer, - chatMessageChannelConsumer, - zoneId, - properties.getKafka().getNumPartitions()); - } - - @Bean - Producer chatMessageChannelProducer( - Properties defaultProducerProperties, - ChatBackendProperties chatBackendProperties, - StringSerializer stringSerializer, - JsonSerializer messageSerializer) - { - Map properties = new HashMap<>(); - defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); - return new KafkaProducer<>( - properties, - stringSerializer, - messageSerializer); - } - - @Bean - StringSerializer stringSerializer() - { - return new StringSerializer(); - } - - @Bean - JsonSerializer chatMessageSerializer() - { - JsonSerializer serializer = new JsonSerializer<>(); - serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), - false); - return serializer; - } - - @Bean - Consumer chatMessageChannelConsumer( - Properties defaultConsumerProperties, - ChatBackendProperties chatBackendProperties, - StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) - { - Map properties = new HashMap<>(); - defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); - properties.put( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); - properties.put( - ConsumerConfig.GROUP_ID_CONFIG, - "chat_message_channel"); - return new KafkaConsumer<>( - properties, - stringDeserializer, - messageDeserializer); - } - - @Bean - StringDeserializer stringDeserializer() - { - return new StringDeserializer(); - } - - @Bean - JsonDeserializer chatMessageDeserializer() - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of( - JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, - JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), - false ); - return deserializer; - } - - @Bean - Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) - { - Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers()); - return properties; - } - - @Bean - Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties) - { - Properties properties = new Properties(); - properties.setProperty( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - chatBackendProperties.getKafka().getBootstrapServers()); - properties.setProperty( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, - "false"); - properties.setProperty( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - "earliest"); - return properties; - } - - @Bean - ZoneId zoneId() - { - return ZoneId.systemDefault(); - } -}