package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.utils.Utils;
private final ChatRoomChannel chatRoomChannel;
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Sending create-command for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ }
+
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatRoomChannel
- .getChatRoom(shard, id)
+ .getChatRoomInfo(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
chatRoomChannel.getOwnedShards())));
}
- int selectShard(UUID chatRoomId)
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ return chatRoomChannel.getChatRoomInfo();
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomData(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
{
- return chatRoomChannel.getChatRooms();
+ return chatRoomChannel.getChatRoomData();
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
}