WIP
authorKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 19:40:45 +0000 (21:40 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 19:40:45 +0000 (21:40 +0200)
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java [new file with mode: 0644]

index 381c6c6..94e049b 100644 (file)
@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import java.net.URI;
 import java.nio.file.Paths;
 
 
@@ -25,7 +26,8 @@ public class ChatBackendProperties
   {
     private ShardingStrategyType shardingStrategy = ShardingStrategyType.none;
     private int numShards = 1;
-    private int[] ownedShards = new int[] { 0 };
+    private int[] ownedShards = new int[0];
+    private URI[] shardOwners = new URI[0];
     private StorageStrategyType storageStrategy = StorageStrategyType.none;
     private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
   }
@@ -34,6 +36,7 @@ public class ChatBackendProperties
   @Setter
   public static class KafkaServicesProperties
   {
+    private URI instanceUri = URI.create("http://localhost:8080");
     private String clientIdPrefix;
     private String bootstrapServers = ":9092";
     private String infoChannelTopic = "info_channel";
index 529be8d..f3efe79 100644 (file)
@@ -128,6 +128,12 @@ public class ChatBackendController
                 .build());
   }
 
+  @GetMapping("/shards")
+  public Mono<String[]> getShardOwners()
+  {
+    return chatHomeService.getShardOwners();
+  }
+
   @PostMapping("/store")
   public void store()
   {
index 388a20a..19fa26c 100644 (file)
@@ -15,4 +15,6 @@ public interface ChatHomeService
   Flux<ChatRoomInfo> getChatRoomInfo();
 
   Mono<ChatRoomData> getChatRoomData(UUID id);
+
+  Mono<String[]> getShardOwners();
 }
index f60b193..89a3c11 100644 (file)
@@ -34,7 +34,8 @@ public class InMemoryServicesConfiguration
     return new SimpleChatHomeService(
         storageStrategy,
         clock,
-        properties.getChatroomBufferSize());
+        properties.getChatroomBufferSize(),
+        properties.getInstanceUri());
   }
 
   @Bean
index 06c197b..b7268a8 100644 (file)
@@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -90,6 +91,15 @@ public class ShardedChatHomeService implements ChatHomeService
                 : throwable);
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.just(Arrays
+        .stream(chatHomes)
+        .map(chatHome -> chatHome.getInstanceUri())
+        .toArray(size -> new String[size]));
+  }
+
   private int selectShard(UUID chatroomId)
   {
     return shardingStrategy.selectShard(chatroomId);
index 30a181e..4cb490d 100644 (file)
@@ -3,10 +3,12 @@ package de.juplo.kafka.chat.backend.implementation.inmemory;
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
 import java.time.Clock;
 import java.util.*;
 
@@ -19,26 +21,31 @@ public class SimpleChatHomeService implements ChatHomeService
   private final Map<UUID, ChatRoomData> chatRoomData;
   private final Clock clock;
   private final int bufferSize;
+  @Getter
+  private final URI instanceUri;
 
 
 
   public SimpleChatHomeService(
       StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize)
+      int bufferSize,
+      URI instanceUri)
   {
     this(
         null,
         storageStrategy,
         clock,
-        bufferSize);
+        bufferSize,
+        instanceUri);
   }
 
   public SimpleChatHomeService(
       Integer shard,
       StorageStrategy storageStrategy,
       Clock clock,
-      int bufferSize)
+      int bufferSize,
+      URI instanceUri)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
 ;
@@ -78,6 +85,7 @@ public class SimpleChatHomeService implements ChatHomeService
         });
     this.clock = clock;
     this.bufferSize = bufferSize;
+    this.instanceUri = instanceUri;
   }
 
 
@@ -114,4 +122,10 @@ public class SimpleChatHomeService implements ChatHomeService
         .justOrEmpty(chatRoomData.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
+
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.empty();
+  }
 }
index da90663..4eedeb4 100644 (file)
@@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           currentOffset);
 
       consumer.seek(topicPartition, nextOffset[partition]);
+      infoChannel.sendShardAssignedEvent(partition);
     });
 
     consumer.resume(partitions);
@@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+      infoChannel.sendShardRevokedEvent(partition);
     });
   }
 
index 26e8696..8756a06 100644 (file)
@@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
 import java.time.*;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +34,7 @@ public class InfoChannel implements Runnable
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final String instanceUri;
 
   private boolean running;
 
@@ -37,7 +42,8 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    URI instanceUri)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -55,6 +61,8 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = -1l);
+
+    this.instanceUri = instanceUri.toASCIIString();
   }
 
 
@@ -83,7 +91,7 @@ public class InfoChannel implements Runnable
       {
         if (metadata != null)
         {
-          log.info("Successfully sent chreate-request for chat room: {}", to);
+          log.info("Successfully sent created event for chat chat-room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
           sink.success(chatRoomInfo);
         }
@@ -91,7 +99,7 @@ public class InfoChannel implements Runnable
         {
           // On send-failure
           log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
+              "Could not send created event for chat-room (id={}, name={}): {}",
               chatRoomId,
               name,
               exception);
@@ -101,6 +109,70 @@ public class InfoChannel implements Runnable
     });
   }
 
+  Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+  {
+    EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard assigned event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard assigned event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+  {
+    EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard revoked event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard revoked event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
 
   @Override
   public void run()
@@ -146,6 +218,16 @@ public class InfoChannel implements Runnable
           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
+        case EVENT_SHARD_ASSIGNED:
+          EventShardAssigned eventShardAssigned =
+              (EventShardAssigned) record.value();
+          break;
+
+        case EVENT_SHARD_REVOKED:
+          EventShardRevoked eventShardRevoked =
+              (EventShardRevoked) record.value();
+          break;
+
         default:
           log.debug(
               "Ignoring message for key={} with offset={}: {}",
index 1aef3fd..843a2c4 100644 (file)
@@ -12,6 +12,8 @@ public class AbstractMessageTo
     COMMAND_CREATE_CHATROOM,
     EVENT_CHATMESSAGE_RECEIVED,
     EVENT_CHATROOM_CREATED,
+    EVENT_SHARD_ASSIGNED,
+    EVENT_SHARD_REVOKED,
   }
 
   @Getter
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java
new file mode 100644 (file)
index 0000000..8ba23f3
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventShardAssigned extends AbstractMessageTo
+{
+  private int shard;
+  private String uri;
+
+
+  public EventShardAssigned()
+  {
+    super(ToType.EVENT_SHARD_ASSIGNED);
+  }
+
+
+  public static EventShardAssigned of(
+      int shard,
+      String uri)
+  {
+    EventShardAssigned event = new EventShardAssigned();
+    event.setShard(shard);
+    event.setUri(uri);
+    return event;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java
new file mode 100644 (file)
index 0000000..1e37f43
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventShardRevoked extends AbstractMessageTo
+{
+  private int shard;
+  private String uri;
+
+
+  public EventShardRevoked()
+  {
+    super(ToType.EVENT_SHARD_REVOKED);
+  }
+
+
+  public static EventShardRevoked of(
+      int shard,
+      String uri)
+  {
+    EventShardRevoked event = new EventShardRevoked();
+    event.setShard(shard);
+    event.setUri(uri);
+    return event;
+  }
+}