NEU
authorKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 19:54:25 +0000 (21:54 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 19 Apr 2023 19:54:25 +0000 (21:54 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index bff38ae..69947a9 100644 (file)
@@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -29,8 +28,8 @@ import java.util.stream.IntStream;
 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Consumer<String, MessageTo> consumer;
   private final Producer<String, MessageTo> producer;
+  private final Consumer<String, MessageTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
   private final boolean[] isShardOwned;
@@ -46,8 +45,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   public ChatMessageChannel(
     String topic,
-    Consumer<String, MessageTo> consumer,
     Producer<String, MessageTo> producer,
+    Consumer<String, MessageTo> consumer,
     ZoneId zoneId,
     int numShards)
   {
@@ -270,8 +269,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
 
-  Flux<ChatRoom> getChatRooms(int shard)
+  Flux<ChatRoom> getChatRooms()
   {
-    return Flux.fromStream(chatrooms[shard].values().stream());
+    return Flux.fromStream(IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .mapToObj(shard -> Integer.valueOf(shard))
+        .flatMap(shard -> chatrooms[shard].values().stream()));
   }
 }
index ac44f99..f9568e7 100644 (file)
@@ -23,8 +23,8 @@ import java.util.UUID;
 public class ChatRoomChannel implements Runnable
 {
   private final String topic;
-  private final Consumer<Integer, ChatRoomTo> consumer;
   private final Producer<Integer, ChatRoomTo> producer;
+  private final Consumer<Integer, ChatRoomTo> consumer;
   private final ShardingStrategy shardingStrategy;
   private final ChatMessageChannel chatMessageChannel;
   private final Clock clock;
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
new file mode 100644 (file)
index 0000000..88947a0
--- /dev/null
@@ -0,0 +1,49 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+  private final ShardingStrategy shardingStrategy;
+  private final ChatMessageChannel chatMessageChanel;
+
+
+  @Override
+  public Mono<ChatRoom> getChatRoom(UUID id)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    if (chatMessageChanel.isLoadInProgress())
+    {
+      throw new LoadInProgressException(shard);
+    }
+    else
+    {
+      return chatMessageChanel.getChatRoom(shard, id);
+    }
+  }
+
+  @Override
+  public Flux<ChatRoom> getChatRooms()
+  {
+    if (chatMessageChanel.isLoadInProgress())
+    {
+      throw new LoadInProgressException();
+    }
+    else
+    {
+      return chatMessageChanel.getChatRooms();
+    }
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
deleted file mode 100644 (file)
index 38aecd1..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class KafkaChatHomeService implements ChatHomeService
-{
-  private final ChatMessageChannel chatMessageChanel;
-
-
-  public KafkaChatHomeService(ChatMessageChannel chatMessageChannel)
-  {
-    log.debug("Creating KafkaChatHomeService");
-    this.chatMessageChanel = chatMessageChannel;
-  }
-
-
-  @Override
-  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
-  {
-    if (chatMessageChanel.isLoadInProgress())
-    {
-      throw new ShardNotOwnedException(shard);
-    }
-    else
-    {
-      return chatMessageChanel.getChatRoom(shard, id);
-    }
-  }
-
-  @Override
-  public Flux<ChatRoom> getChatRooms(int shard)
-  {
-    if (chatMessageChanel.isLoadInProgress())
-    {
-      throw new ShardNotOwnedException(shard);
-    }
-    else
-    {
-      return chatMessageChanel.getChatRooms(shard);
-    }
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java
new file mode 100644 (file)
index 0000000..83e06bd
--- /dev/null
@@ -0,0 +1,17 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+
+
+public class LoadInProgressException extends ShardNotOwnedException
+{
+  public LoadInProgressException()
+  {
+    this(-1);
+  }
+
+  public LoadInProgressException(int shard)
+  {
+    super(shard);
+  }
+}
index 91115ea..55aa6f8 100644 (file)
@@ -7,15 +7,17 @@ import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Clock;
+import java.time.ZoneId;
 
 
 @ConditionalOnProperty(
@@ -27,61 +29,61 @@ public class KafkaServicesConfiguration implements ApplicationRunner
 {
   @Bean
   ChatHome kafkaChatHome(
-      ChatBackendProperties properties,
-      KafkaChatHomeService chatHomeService)
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel)
   {
-    int numShards = properties.getInmemory().getNumShards();
-    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
-    for (int shard = 0; shard < numShards; shard++)
-    {
-
-    }
-        .read()
-        .subscribe(chatRoom ->
-        {
-          int shard = chatRoom.getShard();
-          if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
-        });
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHome(chatHomes, strategy);
+    return new KafkaChatHome(shardingStrategy, chatMessageChannel);
   }
 
   @Bean
-  KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+  KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
   {
-    ShardingStrategyType sharding =
-        properties.getInmemory().getShardingStrategy();
-    int numShards = sharding == ShardingStrategyType.none
-        ? 1
-        : properties.getInmemory().getNumShards();
-    int[] ownedShards = sharding == ShardingStrategyType.none
-        ? new int[] { 0 }
-        : properties.getInmemory().getOwnedShards();
-    return new InMemoryChatHomeService(
-        numShards,
-        ownedShards,
-        storageStrategy.read());
+    return new KafkaChatRoomFactory(chatRoomChannel);
   }
 
   @Bean
-  InMemoryChatRoomFactory chatRoomFactory(
-      InMemoryChatHomeService service,
-      ShardingStrategy strategy,
-      Clock clock,
-      ChatBackendProperties properties)
+  ChatRoomChannel chatRoomChannel(
+      ChatBackendProperties properties,
+      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      ShardingStrategy shardingStrategy,
+      ChatMessageChannel chatMessageChannel,
+      Clock clock)
   {
-    return new InMemoryChatRoomFactory(
-        service,
-        strategy,
+    return new ChatRoomChannel(
+        properties.getKafka().getTopic(),
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
+        shardingStrategy,
+        chatMessageChannel,
         clock,
         properties.getChatroomBufferSize());
   }
 
   @Bean
-  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  ChatMessageChannel chatMessageChannel(
+      ChatBackendProperties properties,
+      Producer<String, MessageTo> chatMessageChannelProducer,
+      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      ZoneId zoneId)
   {
-    return new KafkaLikeShardingStrategy(
+    return new ChatMessageChannel(
+        properties.getKafka().getTopic(),
+        chatMessageChannelProducer,
+        chatMessageChannelConsumer,
+        zoneId,
         properties.getKafka().getNumPartitions());
   }
+
+  @Bean
+  ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+  }
+
+  @Bean
+  ZoneId zoneId()
+  {
+    return ZoneId.systemDefault();
+  }
 }