X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaChatHomeService.java;h=9409716f5a1bb418d0a556b6c69fb57c62945127;hb=8d997cc65763b3f12fb680da67f471590e6eeeb2;hp=0769caf1f841f5b4e32661f90e790b70df92753b;hpb=76ea470b60f6664db4257b6935e1d5c848523ffc;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 0769caf1..9409716f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -18,50 +18,53 @@ import java.util.*; public class KafkaChatHomeService implements ChatHomeService { private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + int shard = selectShard(id); + log.info( + "Sending create-command for chat rooom: id={}, name={}, shard={}", + id, + name, + shard); + return infoChannel.sendChatRoomCreatedEvent(id, name, shard); } @Override public Mono getChatRoomInfo(UUID id) { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); + return infoChannel + .getChatRoomInfo(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomInfo(); + return infoChannel.getChatRoomInfo(); } @Override public Mono getChatRoomData(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return dataChannel .getChatRoomData(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } - public Flux getChatRoomData() + @Override + public Mono getShardOwners() { - return chatRoomChannel.getChatRoomData(); + return infoChannel.getShardOwners(); } int selectShard(UUID chatRoomId)