From fbbf91af3d2edf8e15b3322d9be300338fc2ed2f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Apr 2023 09:45:10 +0200 Subject: [PATCH] NEU --- pom.xml | 4 + .../kafka/KafkaServicesConfiguration.java | 134 ++++++++++++++++-- 2 files changed, 129 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index 2819be45..3e3251cf 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-test diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 5367fdf0..b11babc7 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -1,18 +1,19 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -20,12 +21,13 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.concurrent.ListenableFuture; import java.time.Clock; import java.time.ZoneId; -import java.util.Optional; +import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -122,6 +124,62 @@ public class KafkaServicesConfiguration implements ApplicationRunner properties.getChatroomBufferSize()); } + @Bean + Producer chatRoomChannelProducer( + Properties producerProperties, + IntegerSerializer integerSerializer, + JsonSerializer chatRoomSerializer) + { + return new KafkaProducer<>( + producerProperties, + integerSerializer, + chatRoomSerializer); + } + + @Bean + IntegerSerializer integerSerializer() + { + return new IntegerSerializer(); + } + + @Bean + JsonSerializer chatRoomSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + return serializer; + } + + @Bean + Consumer chatRoomChannelConsumer( + Properties producerProperties, + IntegerDeserializer integerDeserializer, + JsonDeserializer chatRoomDeserializer) + { + return new KafkaConsumer<>( + producerProperties, + integerDeserializer, + chatRoomDeserializer); + } + + @Bean + IntegerDeserializer integerDeserializer() + { + return new IntegerDeserializer(); + } + + @Bean + JsonDeserializer chatRoomDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + return deserializer; + } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + @Bean ChatMessageChannel chatMessageChannel( ChatBackendProperties properties, @@ -138,9 +196,67 @@ public class KafkaServicesConfiguration implements ApplicationRunner } @Bean - ShardingStrategy shardingStrategy(ChatBackendProperties properties) + Producer chatMessageChannelProducer( + Properties producerProperties, + StringSerializer stringSerializer, + JsonSerializer messageSerializer) { - return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + return new KafkaProducer<>( + producerProperties, + stringSerializer, + messageSerializer); + } + + @Bean + StringSerializer stringSerializer() + { + return new StringSerializer(); + } + + @Bean + JsonSerializer chatMessageSerializer() + { + JsonSerializer serializer = new JsonSerializer<>(); + return serializer; + } + + @Bean + Consumer chatMessageChannelConsumer( + Properties producerProperties, + StringDeserializer stringDeserializer, + JsonDeserializer messageDeserializer) + { + return new KafkaConsumer<>( + producerProperties, + stringDeserializer, + messageDeserializer); + } + + @Bean + StringDeserializer stringDeserializer() + { + return new StringDeserializer(); + } + + @Bean + JsonDeserializer chatMessageDeserializer() + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + return deserializer; + } + + @Bean + Properties producerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + return properties; + } + + @Bean + Properties consumerProperties(ChatBackendProperties chatBackendProperties) + { + Properties properties = new Properties(); + return properties; } @Bean -- 2.20.1