refactor: Added success- and failure-callbacks for `ChatHomeService`
authorKai Moritz <kai@juplo.de>
Sun, 18 Feb 2024 20:46:56 +0000 (21:46 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 19 Feb 2024 13:06:27 +0000 (14:06 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java

index 8f91ad0..019db65 100644 (file)
@@ -17,10 +17,23 @@ public interface StorageStrategy
   Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName());
 
   default void write(ChatHomeService chatHomeService)
+  {
+    write(
+        chatHomeService,
+        this::logSuccessChatHomeService,
+        this::logFailureChatHomeService);
+  }
+
+  default void write(
+      ChatHomeService chatHomeService,
+      ChatHomeServiceWrittenSuccessCallback successCallback,
+      ChatHomeServiceWrittenFailureCallback failureCallback)
   {
     writeChatRoomInfo(
         chatHomeService
             .getChatRoomInfo()
+            .doOnComplete(() -> successCallback.accept(chatHomeService))
+            .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable))
             .doOnNext(chatRoomInfo ->
                 writeChatRoomData(
                     chatRoomInfo.getId(),
@@ -48,6 +61,19 @@ public interface StorageStrategy
   void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
   Flux<Message> readChatRoomData(UUID chatRoomId);
 
+  interface ChatHomeServiceWrittenSuccessCallback extends Consumer<ChatHomeService> {}
+  interface ChatHomeServiceWrittenFailureCallback extends BiConsumer<ChatHomeService, Throwable> {}
+
+  default void logSuccessChatHomeService(ChatHomeService chatHomeService)
+  {
+    log.info("Successfully stored {}", chatHomeService);
+  }
+
+  default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable)
+  {
+    log.error("Could not store {}: {}", chatHomeService, throwable);
+  }
+
   interface ChatRoomWrittenSuccessCallback extends Consumer<UUID> {}
   interface ChatRoomWrittenFailureCallback extends BiConsumer<UUID, Throwable> {}
 
index ab7f8d4..9a38453 100644 (file)
@@ -43,12 +43,7 @@ public class ShardedChatHomeService implements ChatHomeService
     for (int shard = 0; shard < chatHomes.length; shard++)
       if(chatHomes[shard] != null)
         this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
+    log.info("Created {}", this);
   }
 
 
@@ -111,4 +106,18 @@ public class ShardedChatHomeService implements ChatHomeService
   {
     return shardingStrategy.selectShard(chatroomId);
   }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName());
+    stringBuffer.append(", shards=[");
+    stringBuffer.append(ownedShards
+        .stream()
+        .sorted()
+        .map(String::valueOf)
+        .collect(Collectors.joining(",")));
+    stringBuffer.append("]");
+    return stringBuffer.toString();
+  }
 }
index cf6d20a..5ed039e 100644 (file)
@@ -40,9 +40,10 @@ public class SimpleChatHomeService implements ChatHomeService
       Clock clock,
       int bufferSize)
   {
-    log.info("Created SimpleChatHome for shard {}", shard);
 ;
     this.shard = shard;
+    log.info("Created {}", this);
+
     this.chatRoomInfo = new HashMap<>();
     this.chatRoomData = new HashMap<>();
     storageStrategy
@@ -120,4 +121,10 @@ public class SimpleChatHomeService implements ChatHomeService
   {
     return Mono.empty();
   }
+
+  @Override
+  public String toString()
+  {
+    return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard;
+  }
 }
index 03eaabf..528a0e6 100644 (file)
@@ -7,10 +7,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -330,4 +327,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
     return new ChatRoomData(clock, service, bufferSize);
   }
+
+  ConsumerGroupMetadata getConsumerGroupMetadata()
+  {
+    return consumer.groupMetadata();
+  }
 }
index eb03d59..3a87318 100644 (file)
@@ -72,4 +72,13 @@ public class KafkaChatHomeService implements ChatHomeService
     byte[] serializedKey = chatRoomId.toString().getBytes();
     return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
   }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer stringBuffer = new StringBuffer(KafkaChatHomeService.class.getSimpleName());
+    stringBuffer.append(", ");
+    stringBuffer.append(dataChannel.getConsumerGroupMetadata());
+    return stringBuffer.toString();
+  }
 }