import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo -> new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- clock,
- factory.create(readMessages(chatRoomTo)),
- bufferSize));
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = chatRoomTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ return new ChatRoom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ shard,
+ clock,
+ factory.create(readMessages(chatRoomTo)),
+ bufferSize);
+ });
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
{
Path path = chatroomPath(chatroomTo);
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
{
Path path = chatroomPath(chatroomTo);
public Flux<Message> readMessages(ChatRoomTo chatroomTo)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
public Flux<Message> readMessages(ChatRoomTo chatroomTo)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);