import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
{
try
{
- ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
- generator.writeObject(chatroomTo);
- writeMessages(chatroomTo, chatroom.getMessages());
+ ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
+ generator.writeObject(infoTo);
+ writeMessages(infoTo, chatroom.getMessages());
}
catch (IOException e)
{
@Override
public Flux<ChatRoom> read()
{
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
- .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo ->
+ .map(infoTo ->
{
- UUID chatRoomId = chatRoomTo.getId();
+ UUID chatRoomId = infoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
+ infoTo.getId(),
+ infoTo.getName(),
shard,
clock,
- factory.create(readMessages(chatRoomTo)),
+ factory.create(readMessages(infoTo)),
bufferSize);
});
}
- public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+ public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Writing messages for {} to {}", chatroomTo, path);
+ Path path = chatroomPath(infoTo);
+ log.info("Writing messages for {} to {}", infoTo, path);
try
{
Files.createDirectories(storagePath);
}
}
- public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+ public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
.log()
.map(MessageTo::toMessage);
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatRoomTo chatroomTo)
+ Path chatroomPath(ChatRoomInfoTo infoTo)
{
- return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
}
}