NEU
authorKai Moritz <kai@juplo.de>
Sun, 16 Apr 2023 09:25:26 +0000 (11:25 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 Apr 2023 09:25:26 +0000 (11:25 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java [new file with mode: 0644]

index 177d4f5..4724f6b 100644 (file)
@@ -16,6 +16,7 @@ public class ChatBackendProperties
   private int chatroomBufferSize = 8;
   private ServiceType services = ServiceType.inmemory;
   private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+  private KafkaServicesProperties kafka = new KafkaServicesProperties();
 
 
   @Getter
@@ -29,6 +30,14 @@ public class ChatBackendProperties
     private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
   }
 
+  @Getter
+  @Setter
+  public static class KafkaServicesProperties
+  {
+    private String topic = "test";
+    private int numPartitions = 2;
+  }
+
   public enum ServiceType { inmemory }
   public enum StorageStrategyType { files, mongodb }
   public enum ShardingStrategyType { none, kafkalike }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
deleted file mode 100644 (file)
index 84484d9..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.util.UUID;
-
-
-/**
- * Derzeit eigentlich einzige aktive Strategie!
- * Rückbau?!?!
- */
-@RequiredArgsConstructor
-@Slf4j
-class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
-  private final KafkaChatRoomService kafkaChatRoomService;
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
-  private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
-
-
-  @Override
-  public Mono<Message> handleMessage(
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, MessageTo> record =
-          new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
-              timestamp.toEpochSecond(zoneOffset),
-              chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          {
-            // Emit new message
-            Message message = new Message(key, metadata.offset(), timestamp, text);
-            kafkaChatRoomService.addMessage(message);
-          }
-
-          sink.success();
-        }
-        else
-        {
-          // On send-failure
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-}
index 3ca5b7f..22cd74b 100644 (file)
@@ -20,14 +20,12 @@ import reactor.core.publisher.Mono;
 
 import java.time.*;
 import java.util.*;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.IntStream;
 
 
 @Slf4j
 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
 {
-  private final ExecutorService executorService;
   private final Consumer<String, MessageTo> consumer;
   private final Producer<String, MessageTo> producer;
   private final String topic;
@@ -36,7 +34,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoom>[] chatRoomMaps;
+  private final Map<UUID, ChatRoom>[] chatrooms;
   private final KafkaLikeShardingStrategy shardingStrategy;
 
   private boolean running;
@@ -44,7 +42,6 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
 
 
   public KafkaChatHomeService(
-    ExecutorService executorService,
     Consumer<String, MessageTo> consumer,
     Producer<String, MessageTo> producer,
     String topic,
@@ -52,7 +49,6 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
     int numShards)
   {
     log.debug("Creating KafkaChatHomeService");
-    this.executorService = executorService;
     this.consumer = consumer;
     this.producer = producer;
     this.topic = topic;
@@ -61,7 +57,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomMaps = new Map[numShards];
+    this.chatrooms = new Map[numShards];
     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
   }
 
@@ -167,7 +163,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
 
       Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
 
-      ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+      ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
       KafkaChatRoomService kafkaChatRoomService =
           (KafkaChatRoomService) chatRoom.getChatRoomService();
 
@@ -241,6 +237,12 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
   }
 
 
+  public void putChatRoom(ChatRoom chatRoom)
+  {
+    // TODO: Nachricht senden!
+    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+  }
+
   @Override
   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
@@ -250,7 +252,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
     }
     else
     {
-      return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+      return Mono.justOrEmpty(chatrooms[shard].get(id));
     }
   }
 
@@ -263,7 +265,7 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer
     }
     else
     {
-      return Flux.fromStream(chatRoomMaps[shard].values().stream());
+      return Flux.fromStream(chatrooms[shard].values().stream());
     }
   }
 }
index 20d85e8..f81d21f 100644 (file)
@@ -2,16 +2,32 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Mono;
 
+import java.time.Clock;
 import java.util.UUID;
 
 
+@RequiredArgsConstructor
+@Slf4j
 public class KafkaChatRoomFactory implements ChatRoomFactory
 {
+  private final KafkaChatHomeService kafkaChatHomeService;
+  private final ShardingStrategy shardingStrategy;
+  private final Clock clock;
+  private final int bufferSize;
+
   @Override
   public Mono<ChatRoom> createChatRoom(UUID id, String name)
   {
-    return null;
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    int shard = shardingStrategy.selectShard(id);
+    KafkaChatRoomService service = new KafkaChatRoomService(kafkaChatHomeService, id);
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    kafkaChatHomeService.putChatRoom(chatRoom);
   }
 }
index 3a8c2c6..16ed3a7 100644 (file)
@@ -1,11 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
new file mode 100644 (file)
index 0000000..fd42d9d
--- /dev/null
@@ -0,0 +1,78 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Clock;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "inmemory",
+    matchIfMissing = true)
+@Configuration
+public class KafkaServicesConfiguration implements ApplicationRunner
+{
+  @Bean
+  ChatHome kafkaChatHome(
+      ChatBackendProperties properties,
+      InMemoryChatHomeService chatHomeService,
+      StorageStrategy storageStrategy)
+  {
+    int numShards = properties.getInmemory().getNumShards();
+    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
+  }
+
+  @Bean
+  KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+  {
+    ShardingStrategyType sharding =
+        properties.getInmemory().getShardingStrategy();
+    int numShards = sharding == ShardingStrategyType.none
+        ? 1
+        : properties.getInmemory().getNumShards();
+    int[] ownedShards = sharding == ShardingStrategyType.none
+        ? new int[] { 0 }
+        : properties.getInmemory().getOwnedShards();
+    return new InMemoryChatHomeService(
+        numShards,
+        ownedShards,
+        storageStrategy.read());
+  }
+
+  @Bean
+  InMemoryChatRoomFactory chatRoomFactory(
+      InMemoryChatHomeService service,
+      ShardingStrategy strategy,
+      Clock clock,
+      ChatBackendProperties properties)
+  {
+    return new InMemoryChatRoomFactory(
+        service,
+        strategy,
+        clock,
+        properties.getChatroomBufferSize());
+  }
+
+  @Bean
+  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(
+        properties.getKafka().getNumPartitions());
+  }
+}