NEU
authorKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 07:45:10 +0000 (09:45 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 20 Apr 2023 07:45:10 +0000 (09:45 +0200)
pom.xml
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

diff --git a/pom.xml b/pom.xml
index 2819be4..3e3251c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
index 5367fdf..b11babc 100644 (file)
@@ -1,18 +1,19 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -20,12 +21,13 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
 
 import java.time.Clock;
 import java.time.ZoneId;
-import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
 
@@ -122,6 +124,62 @@ public class KafkaServicesConfiguration implements ApplicationRunner
         properties.getChatroomBufferSize());
   }
 
+  @Bean
+  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+      Properties producerProperties,
+      IntegerSerializer integerSerializer,
+      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+  {
+    return new KafkaProducer<>(
+        producerProperties,
+        integerSerializer,
+        chatRoomSerializer);
+  }
+
+  @Bean
+  IntegerSerializer integerSerializer()
+  {
+    return new IntegerSerializer();
+  }
+
+  @Bean
+  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  {
+    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    return serializer;
+  }
+
+  @Bean
+  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+      Properties producerProperties,
+      IntegerDeserializer integerDeserializer,
+      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+  {
+    return new KafkaConsumer<>(
+        producerProperties,
+        integerDeserializer,
+        chatRoomDeserializer);
+  }
+
+  @Bean
+  IntegerDeserializer integerDeserializer()
+  {
+    return new IntegerDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  {
+    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    return deserializer;
+  }
+
+  @Bean
+  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+  }
+
   @Bean
   ChatMessageChannel chatMessageChannel(
       ChatBackendProperties properties,
@@ -138,9 +196,67 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   }
 
   @Bean
-  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  Producer<String, MessageTo>  chatMessageChannelProducer(
+      Properties producerProperties,
+      StringSerializer stringSerializer,
+      JsonSerializer<MessageTo> messageSerializer)
   {
-    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+    return new KafkaProducer<>(
+        producerProperties,
+        stringSerializer,
+        messageSerializer);
+  }
+
+  @Bean
+  StringSerializer stringSerializer()
+  {
+    return new StringSerializer();
+  }
+
+  @Bean
+  JsonSerializer<MessageTo> chatMessageSerializer()
+  {
+    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    return serializer;
+  }
+
+  @Bean
+  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+      Properties producerProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<MessageTo> messageDeserializer)
+  {
+    return new KafkaConsumer<>(
+        producerProperties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  StringDeserializer stringDeserializer()
+  {
+    return new StringDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  {
+    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    return deserializer;
+  }
+
+  @Bean
+  Properties producerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    return properties;
+  }
+
+  @Bean
+  Properties consumerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    return properties;
   }
 
   @Bean