WIP
authorKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:22:02 +0000 (15:22 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 26 Feb 2023 14:35:29 +0000 (15:35 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java

index d467eab..452f26d 100644 (file)
@@ -1,68 +1,18 @@
 package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
 
 
 public class ShardNotOwnedException extends IllegalStateException
 {
   @Getter
-  private final ChatHomeService chatHomeService;
-  @Getter
-  private final ChatRoomInfo chatRoomInfo;
-  @Getter
-  private final int shard;
-  @Getter
-  private final int[] ownedShards;
-
-
-  public ShardNotOwnedException(
-      ChatHomeService chatHomeService,
-      ChatRoomInfo chatRoomInfo,
-      int shard,
-      Collection<Integer> ownedShards)
-  {
-    this(
-        chatHomeService,
-        chatRoomInfo,
-        shard,
-        ShardNotOwnedException.toArray(ownedShards));
-  }
-
-  public ShardNotOwnedException(
-      ChatHomeService chatHomeService,
-      ChatRoomInfo chatRoomInfo,
-      int shard,
-      int[] ownedShards)
-  {
-    super(
-        chatHomeService +
-        " does not own the shard " +
-        shard +
-        " for ChatRoom " +
-        chatRoomInfo +
-        " owned shards: " +
-        Arrays
-            .stream(ownedShards)
-            .mapToObj(ownedShard -> Integer.toString(ownedShard))
-            .collect(Collectors.joining(", ")));
-    this.chatHomeService = chatHomeService;
-    this.chatRoomInfo = chatRoomInfo;
-    this.shard = shard;
-    this.ownedShards = ownedShards;
-  }
+  private final TopicPartition topicPartition;
 
 
-  private static int[] toArray(Collection<Integer> collection)
+  public ShardNotOwnedException(TopicPartition topicPartition)
   {
-    int[] array = new int[collection.size()];
-    Iterator<Integer> iterator = collection.iterator();
-    for (int i = 0; iterator.hasNext(); i++)
-      array[i] = iterator.next();
-    return array;
+    super("This instance does not own the shard for " + topicPartition);
+    this.topicPartition = topicPartition;
   }
 }
index af99b80..7e55473 100644 (file)
@@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.Message;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
 
 
 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
@@ -27,9 +30,8 @@ class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
   }
 
   @Override
-  public MessageHandlingStrategy handleMessage(Message message)
+  public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
   {
-    // todo
-    return this;
+    return null;
   }
 }
index 892da90..0da4111 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Mono;
@@ -11,7 +12,7 @@ import java.time.LocalDateTime;
 @RequiredArgsConstructor
 class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
-  private final TopicPartition tp;
+  private final TopicPartition topicPartition;
 
   @Override
   public Mono<Message> handleMessage(
@@ -19,6 +20,6 @@ class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
       LocalDateTime timestamp,
       String text)
   {
-    throw new 
+    throw new ShardNotOwnedException(topicPartition);
   }
 }