package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
import java.time.Clock;
import java.time.ZoneId;
-import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
properties.getChatroomBufferSize());
}
+ @Bean
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties producerProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ {
+ return new KafkaProducer<>(
+ producerProperties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties producerProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ return new KafkaConsumer<>(
+ producerProperties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
@Bean
ChatMessageChannel chatMessageChannel(
ChatBackendProperties properties,
}
@Bean
- ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ Producer<String, MessageTo> chatMessageChannelProducer(
+ Properties producerProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<MessageTo> messageSerializer)
{
- return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ return new KafkaProducer<>(
+ producerProperties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Properties producerProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ return new KafkaConsumer<>(
+ producerProperties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ Properties producerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ return properties;
+ }
+
+ @Bean
+ Properties consumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ return properties;
}
@Bean