package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.utils.Utils;
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatRoomChannel
- .getChatRoom(shard, id)
+ .getChatRoomInfo(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
chatRoomChannel.getOwnedShards())));
}
- int selectShard(UUID chatRoomId)
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ return chatRoomChannel.getChatRoomInfo();
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
{
- return chatRoomChannel.getChatRooms();
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomData(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return chatRoomChannel.getChatRoomData();
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
}