import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
log.info("Exiting normally");
}
- void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+ private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
{
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
}
}
- void createChatRoom(
+ private void createChatRoom(
UUID chatRoomId,
CommandCreateChatRoomTo createChatRoomRequestTo,
int partition)
}
- void createChatRoom(ChatRoomInfo chatRoomInfo)
+ private void createChatRoom(ChatRoomInfo chatRoomInfo)
{
UUID id = chatRoomInfo.getId();
String name = chatRoomInfo.getName();
putChatRoom(chatRoom);
}
- void loadChatMessage(
+ private void loadChatMessage(
UUID chatRoomId,
LocalDateTime timestamp,
long offset,
kafkaChatRoomService.persistMessage(message);
}
- boolean isLoadingCompleted()
+ private boolean isLoadingCompleted()
{
return IntStream
.range(0, numShards)
.allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
- void pauseAllOwnedPartions()
+ private void pauseAllOwnedPartions()
{
consumer.pause(IntStream
.range(0, numShards)
Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
+ if (loadInProgress)
+ {
+ throw new LoadInProgressException(shard);
+ }
+
+ if (!isShardOwned[shard])
+ {
+ throw new ShardNotOwnedException(shard);
+ }
+
return Mono.justOrEmpty(chatrooms[shard].get(id));
}