import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
-import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo ->
- {
- UUID chatRoomId = chatRoomTo.getId();
- int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+ .map(chatRoomTo -> new ChatRoom(
chatRoomTo.getId(),
chatRoomTo.getName(),
- shard,
clock,
factory.create(readMessages(chatRoomTo)),
- bufferSize);
- });
+ bufferSize));
}
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)