X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaChatHomeService.java;h=3a87318d62975be319cd33041879f94e90634a09;hb=8300dcd98f681893a077051560151a8f1b94e38d;hp=d3321d71d0c5e968a62898db3fcf76557e3edc71;hpb=c0b341d3e1ad8eb2ba374d4d21c127b701a726bd;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index d3321d71..3a87318d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; +package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomData; @@ -18,55 +18,67 @@ import java.util.*; public class KafkaChatHomeService implements ChatHomeService { private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; + private final InfoChannel infoChannel; + private final DataChannel dataChannel; @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + int shard = selectShard(id); + log.info( + "Sending create-command for chat rooom: id={}, name={}, shard={}", + id, + name, + shard); + return infoChannel.sendChatRoomCreatedEvent(id, name, shard); } @Override public Mono getChatRoomInfo(UUID id) { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); + return infoChannel + .getChatRoomInfo(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRoomInfo() { - return chatRoomChannel.getChatRoomInfo(); + return infoChannel.getChatRoomInfo(); } @Override public Mono getChatRoomData(UUID id) { int shard = selectShard(id); - return chatRoomChannel + return dataChannel .getChatRoomData(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, - chatRoomChannel.getOwnedShards()))); + dataChannel.getOwnedShards()))); } - public Flux getChatRoomData() + @Override + public Mono getShardOwners() { - return chatRoomChannel.getChatRoomData(); + return infoChannel.getShardOwners(); } - int selectShard(UUID chatRoomId) + private int selectShard(UUID chatRoomId) { byte[] serializedKey = chatRoomId.toString().getBytes(); return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; } + + @Override + public String toString() + { + StringBuffer stringBuffer = new StringBuffer(KafkaChatHomeService.class.getSimpleName()); + stringBuffer.append(", "); + stringBuffer.append(dataChannel.getConsumerGroupMetadata()); + return stringBuffer.toString(); + } }