NG - kompiliert!
authorKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 14:19:22 +0000 (16:19 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 15:18:41 +0000 (17:18 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 7189362..77790bd 100644 (file)
@@ -21,15 +21,6 @@ public class KafkaChatHome implements ChatHome
   private final ChatMessageChannel chatMessageChanel;
 
 
-  public KafkaChatHome(
-      int numPartitions,
-      ChatMessageChannel chatMessageChannel)
-  {
-    this.shardingStrategy = new KafkaLikeShardingStrategy(numPartitions);
-    this.chatMessageChanel = chatMessageChannel;
-  }
-
-
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
index 9e1f75e..4ebed8d 100644 (file)
@@ -36,35 +36,16 @@ public class KafkaServicesConfiguration
 {
   @Bean
   ChatHome kafkaChatHome(
-      ShardingStrategy shardingStrategy,
+      KafkaLikeShardingStrategy shardingStrategy,
       ChatMessageChannel chatMessageChannel)
   {
     return new KafkaChatHome(shardingStrategy, chatMessageChannel);
   }
 
   @Bean
-  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
+  KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel)
   {
-    return new KafkaChatRoomFactory(chatRoomChannel);
-  }
-
-  @Bean
-  ChatRoomChannel chatRoomChannel(
-      ChatBackendProperties properties,
-      Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
-      Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
-      ShardingStrategy shardingStrategy,
-      ChatMessageChannel chatMessageChannel,
-      Clock clock)
-  {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatroomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
-        shardingStrategy,
-        chatMessageChannel,
-        clock,
-        properties.getChatroomBufferSize());
+    return new KafkaChatRoomFactory(chatMessageChannel);
   }
 
   @Bean
@@ -86,63 +67,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  IntegerSerializer integerSerializer()
-  {
-    return new IntegerSerializer();
-  }
-
-  @Bean
-  JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
-  {
-    JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
-    serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
-        false);
-    return serializer;
-  }
-
-  @Bean
-  Consumer<Integer, CreateChatRoomRequestTo>  chatRoomChannelConsumer(
-      Properties defaultConsumerProperties,
-      ChatBackendProperties chatBackendProperties,
-      IntegerDeserializer integerDeserializer,
-      JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
-    properties.put(
-        ConsumerConfig.GROUP_ID_CONFIG,
-        "chat_room_channel");
-    return new KafkaConsumer<>(
-        properties,
-        integerDeserializer,
-        chatRoomDeserializer);
-  }
-
-  @Bean
-  IntegerDeserializer integerDeserializer()
-  {
-    return new IntegerDeserializer();
-  }
-
-  @Bean
-  JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
-  {
-    JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
-    deserializer.configure(
-        Map.of(
-            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
-            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
-        false );
-    return deserializer;
-  }
-
-  @Bean
-  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  KafkaLikeShardingStrategy shardingStrategy(ChatBackendProperties properties)
   {
     return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
   }
@@ -152,14 +77,17 @@ public class KafkaServicesConfiguration
       ChatBackendProperties properties,
       Producer<String, AbstractTo> chatMessageChannelProducer,
       Consumer<String, AbstractTo> chatMessageChannelConsumer,
-      ZoneId zoneId)
+      ZoneId zoneId,
+      Clock clock)
   {
     return new ChatMessageChannel(
         properties.getKafka().getMessageChannelTopic(),
         chatMessageChannelProducer,
         chatMessageChannelConsumer,
         zoneId,
-        properties.getKafka().getNumPartitions());
+        properties.getKafka().getNumPartitions(),
+        properties.getChatroomBufferSize(),
+        clock);
   }
 
   @Bean