-package de.juplo.kafka.chat.backend.persistence.kafka;
+package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
public class KafkaChatHomeService implements ChatHomeService
{
private final int numPartitions;
- private final ChatRoomChannel chatRoomChannel;
+ private final InfoChannel infoChannel;
+ private final DataChannel dataChannel;
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ int shard = selectShard(id);
+ log.info(
+ "Sending create-command for chat rooom: id={}, name={}, shard={}",
+ id,
+ name,
+ shard);
+ return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
}
@Override
public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomInfo(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
+ return infoChannel
+ .getChatRoomInfo(id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
public Flux<ChatRoomInfo> getChatRoomInfo()
{
- return chatRoomChannel.getChatRoomInfo();
+ return infoChannel.getChatRoomInfo();
}
@Override
public Mono<ChatRoomData> getChatRoomData(UUID id)
{
int shard = selectShard(id);
- return chatRoomChannel
+ return dataChannel
.getChatRoomData(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
- chatRoomChannel.getOwnedShards())));
+ dataChannel.getOwnedShards())));
}
- public Flux<ChatRoomData> getChatRoomData()
+ @Override
+ public Mono<String[]> getShardOwners()
{
- return chatRoomChannel.getChatRoomData();
+ return infoChannel.getShardOwners();
}
- int selectShard(UUID chatRoomId)
+ private int selectShard(UUID chatRoomId)
{
byte[] serializedKey = chatRoomId.toString().getBytes();
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
+
+ @Override
+ public String toString()
+ {
+ StringBuffer stringBuffer = new StringBuffer(KafkaChatHomeService.class.getSimpleName());
+ stringBuffer.append(", ");
+ stringBuffer.append(dataChannel.getConsumerGroupMetadata());
+ return stringBuffer.toString();
+ }
}