WIP
authorKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 19:56:16 +0000 (21:56 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 19:56:16 +0000 (21:56 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java

index 89a3c11..bc6f103 100644 (file)
@@ -34,8 +34,7 @@ public class InMemoryServicesConfiguration
     return new SimpleChatHomeService(
         storageStrategy,
         clock,
-        properties.getChatroomBufferSize(),
-        properties.getInstanceUri());
+        properties.getChatroomBufferSize());
   }
 
   @Bean
@@ -58,7 +57,10 @@ public class InMemoryServicesConfiguration
             clock,
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHomeService(chatHomes, strategy);
+    return new ShardedChatHomeService(
+        chatHomes,
+        properties.getInmemory().getShardOwners(),
+        strategy);
   }
 
   @ConditionalOnProperty(
index b7268a8..c281d9e 100644 (file)
@@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -20,14 +21,20 @@ public class ShardedChatHomeService implements ChatHomeService
 {
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
+  private final String[] shardOwners;
   private final ShardingStrategy shardingStrategy;
 
 
   public ShardedChatHomeService(
       SimpleChatHomeService[] chatHomes,
+      URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
     this.chatHomes = chatHomes;
+    this.shardOwners = Arrays
+        .stream(shardOwners)
+        .map(uri -> uri.toASCIIString())
+        .toArray(size -> new String[size]);
     this.shardingStrategy = shardingStrategy;
     this.ownedShards = new HashSet<>();
     for (int shard = 0; shard < chatHomes.length; shard++)
@@ -94,10 +101,7 @@ public class ShardedChatHomeService implements ChatHomeService
   @Override
   public Mono<String[]> getShardOwners()
   {
-    return Mono.just(Arrays
-        .stream(chatHomes)
-        .map(chatHome -> chatHome.getInstanceUri())
-        .toArray(size -> new String[size]));
+    return Mono.just(shardOwners);
   }
 
   private int selectShard(UUID chatroomId)
index 4cb490d..cf6d20a 100644 (file)
@@ -3,12 +3,10 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.net.URI;
 import java.time.Clock;
 import java.util.*;
 
@@ -21,31 +19,26 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
   private final int bufferSize;
-  @Getter
-  private final URI instanceUri;
 
 
 
   public SimpleChatHomeService(
       StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize,
-      URI instanceUri)
+      int bufferSize)
   {
     this(
         null,
         storageStrategy,
         clock,
-        bufferSize,
-        instanceUri);
+        bufferSize);
   }
 
   public SimpleChatHomeService(
       Integer shard,
       StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize,
-      URI instanceUri)
+      int bufferSize)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
 ;
@@ -85,7 +78,6 @@ public class SimpleChatHomeService implements ChatHomeService
         });
     this.clock = clock;
     this.bufferSize = bufferSize;
-    this.instanceUri = instanceUri;
   }
 
 
index 8756a06..4711dbd 100644 (file)
@@ -31,6 +31,7 @@ public class InfoChannel implements Runnable
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final int numShards;
+  private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
@@ -56,6 +57,7 @@ public class InfoChannel implements Runnable
     this.numShards = consumer
         .partitionsFor(topic)
         .size();
+    this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
     IntStream
@@ -221,11 +223,21 @@ public class InfoChannel implements Runnable
         case EVENT_SHARD_ASSIGNED:
           EventShardAssigned eventShardAssigned =
               (EventShardAssigned) record.value();
+          log.info(
+              "Shard {} was assigned to {}",
+              eventShardAssigned.getShard(),
+              eventShardAssigned.getUri());
+          shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
           break;
 
         case EVENT_SHARD_REVOKED:
           EventShardRevoked eventShardRevoked =
               (EventShardRevoked) record.value();
+          log.info(
+              "Shard {} was revoked from {}",
+              eventShardRevoked.getShard(),
+              eventShardRevoked.getUri());
+          shardOwners[eventShardRevoked.getShard()] = null;
           break;
 
         default:
@@ -272,4 +284,9 @@ public class InfoChannel implements Runnable
   {
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
+
+  Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
 }
index 9832519..2984d8c 100644 (file)
@@ -61,6 +61,12 @@ public class KafkaChatHomeService implements ChatHomeService
             dataChannel.getOwnedShards())));
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    infoChannel.getShardOwners();
+  }
+
   int selectShard(UUID chatRoomId)
   {
     byte[] serializedKey = chatRoomId.toString().getBytes();