From: Kai Moritz Date: Sun, 18 Feb 2024 20:46:56 +0000 (+0100) Subject: refactor: Added success- and failure-callbacks for `ChatHomeService` X-Git-Tag: rebase--2024-02-20--09-03~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=8f54ff16fa679267349aad7afd79a1101d66795e;p=demos%2Fkafka%2Fchat refactor: Added success- and failure-callbacks for `ChatHomeService` --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index 8f91ad02..019db657 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -17,10 +17,23 @@ public interface StorageStrategy Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); default void write(ChatHomeService chatHomeService) + { + write( + chatHomeService, + this::logSuccessChatHomeService, + this::logFailureChatHomeService); + } + + default void write( + ChatHomeService chatHomeService, + ChatHomeServiceWrittenSuccessCallback successCallback, + ChatHomeServiceWrittenFailureCallback failureCallback) { writeChatRoomInfo( chatHomeService .getChatRoomInfo() + .doOnComplete(() -> successCallback.accept(chatHomeService)) + .doOnError(throwable -> failureCallback.accept(chatHomeService, throwable)) .doOnNext(chatRoomInfo -> writeChatRoomData( chatRoomInfo.getId(), @@ -48,6 +61,19 @@ public interface StorageStrategy void writeChatRoomData(UUID chatRoomId, Flux messageFlux); Flux readChatRoomData(UUID chatRoomId); + interface ChatHomeServiceWrittenSuccessCallback extends Consumer {} + interface ChatHomeServiceWrittenFailureCallback extends BiConsumer {} + + default void logSuccessChatHomeService(ChatHomeService chatHomeService) + { + log.info("Successfully stored {}", chatHomeService); + } + + default void logFailureChatHomeService(ChatHomeService chatHomeService, Throwable throwable) + { + log.error("Could not store {}: {}", chatHomeService, throwable); + } + interface ChatRoomWrittenSuccessCallback extends Consumer {} interface ChatRoomWrittenFailureCallback extends BiConsumer {} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index ab7f8d43..9a384533 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -43,12 +43,7 @@ public class ShardedChatHomeService implements ChatHomeService for (int shard = 0; shard < chatHomes.length; shard++) if(chatHomes[shard] != null) this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); + log.info("Created {}", this); } @@ -111,4 +106,18 @@ public class ShardedChatHomeService implements ChatHomeService { return shardingStrategy.selectShard(chatroomId); } + + @Override + public String toString() + { + StringBuffer stringBuffer = new StringBuffer(ShardedChatHomeService.class.getSimpleName()); + stringBuffer.append(", shards=["); + stringBuffer.append(ownedShards + .stream() + .sorted() + .map(String::valueOf) + .collect(Collectors.joining(","))); + stringBuffer.append("]"); + return stringBuffer.toString(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index cf6d20a5..5ed039ea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -40,9 +40,10 @@ public class SimpleChatHomeService implements ChatHomeService Clock clock, int bufferSize) { - log.info("Created SimpleChatHome for shard {}", shard); ; this.shard = shard; + log.info("Created {}", this); + this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); storageStrategy @@ -120,4 +121,10 @@ public class SimpleChatHomeService implements ChatHomeService { return Mono.empty(); } + + @Override + public String toString() + { + return SimpleChatHomeService.class.getSimpleName() + ", shard=" + shard; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 03eaabf8..528a0e68 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -7,10 +7,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -330,4 +327,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); return new ChatRoomData(clock, service, bufferSize); } + + ConsumerGroupMetadata getConsumerGroupMetadata() + { + return consumer.groupMetadata(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index eb03d593..3a87318d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -72,4 +72,13 @@ public class KafkaChatHomeService implements ChatHomeService byte[] serializedKey = chatRoomId.toString().getBytes(); return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; } + + @Override + public String toString() + { + StringBuffer stringBuffer = new StringBuffer(KafkaChatHomeService.class.getSimpleName()); + stringBuffer.append(", "); + stringBuffer.append(dataChannel.getConsumerGroupMetadata()); + return stringBuffer.toString(); + } }