import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Clock;
+import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
- private final Clock clock;
- private final int bufferSize;
- private final ChatRoomServiceFactory factory;
+ private final ShardingStrategy shardingStrategy;
private final ObjectMapper mapper;
@Override
- public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatroomFlux
+ chatRoomInfoFlux
.log()
.doFirst(() ->
{
throw new RuntimeException(e);
}
})
- .subscribe(chatroom ->
+ .subscribe(chatRoomInfo ->
{
try
{
- ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
- generator.writeObject(chatroomTo);
- writeMessages(chatroomTo, chatroom.getMessages());
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
}
catch (IOException e)
{
}
@Override
- public Flux<ChatRoom> readChatrooms()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
{
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
- .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo -> new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- clock,
- factory.create(readMessages(chatRoomTo)),
- bufferSize));
+ .map(chatRoomInfoTo ->
+ {
+ UUID chatRoomId = chatRoomInfoTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomInfoTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomInfoTo.getName(),
+ shard);
+ });
}
@Override
- public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Writing messages for {} to {}", chatroomTo, path);
+ Path path = chatroomPath(chatRoomId);
+ log.info("Writing messages for {} to {}", chatRoomId, path);
try
{
Files.createDirectories(storagePath);
}
@Override
- public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
.log()
.map(MessageTo::toMessage);
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatRoomTo chatroomTo)
+ Path chatroomPath(UUID id)
{
- return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(id.toString() + ".json"));
}
}