WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:33:59 +0000 (23:33 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:33:59 +0000 (23:33 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java

index d94bc65..3f4faa5 100644 (file)
@@ -25,7 +25,7 @@ import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -37,7 +37,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
@@ -45,7 +44,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public DataChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> consumer,
@@ -68,13 +67,11 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
     this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
         .forEach(shard ->
         {
-          this.chatRoomInfo[shard] = new HashMap<>();
           this.chatRoomData[shard] = new HashMap<>();
         });
   }
@@ -402,27 +399,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
     return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
-
-  Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
-  }
-
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
-  }
 }
index d94bc65..a3a5b43 100644 (file)
@@ -1,6 +1,8 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
@@ -20,12 +22,15 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -45,7 +50,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
     Consumer<String, AbstractMessageTo> consumer,