package de.juplo.kafka.chat.backend.domain;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Clock;
import java.time.LocalDateTime;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
+ @Getter
+ private final UUID id;
private final ChatRoomService service;
private final Clock clock;
private final int bufferSize;
public ChatRoomData(
+ UUID id,
ChatRoomService service,
Clock clock,
int bufferSize)
{
- log.info("Created ChatRoom with buffer-size {}", bufferSize);
+ log.info("Created ChatRoom {id} with buffer-size {}", id, bufferSize);
+ this.id = id;
this.service = service;
this.clock = clock;
this.bufferSize = bufferSize;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
@Override
- public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatRoomDataFlux
+ chatRoomInfoFlux
.log()
.doFirst(() ->
{
{
ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
generator.writeObject(infoTo);
- writeMessages(infoTo, chatroom.getMessages());
}
catch (IOException e)
{
}
}
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(infoTo -> new ChatRoomInfo(
+ infoTo.getId(),
+ infoTo.getName(),
+ infoTo.getShard()));
+ }
+
+ @Override
+ public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+ {
+ Path path = chatroomsPath();
+ log.info("Writing chatrooms to {}", path);
+ try
+ {
+ Files.createDirectories(storagePath);
+
+ JsonGenerator generator =
+ mapper
+ .getFactory()
+ .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+ chatRoomDataFlux
+ .log()
+ .doFirst(() ->
+ {
+ try
+ {
+ generator.useDefaultPrettyPrinter();
+ generator.writeStartArray();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .doOnTerminate(() ->
+ {
+ try
+ {
+ generator.writeEndArray();
+ generator.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .subscribe(chatRoomData -> writeMessages(
+ chatRoomData.getId(),
+ chatRoomData.getMessages()));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public Flux<ChatRoomData> readChatRoomData()
{
.map(infoTo ->
{
UUID chatRoomId = infoTo.getId();
- int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoomData(
+ chatRoomId,
+ factory.create(readMessages(chatRoomId)),
clock,
- factory.create(readMessages(infoTo)),
bufferSize);
});
}
- public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
+ public void writeMessages(UUID id, Flux<Message> messageFlux)
{
- Path path = chatroomPath(infoTo);
- log.info("Writing messages for {} to {}", infoTo, path);
+ Path path = chatroomPath(id);
+ log.info("Writing messages for {} to {}", id, path);
try
{
Files.createDirectories(storagePath);
}
}
- public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
+ public Flux<Message> readMessages(UUID id)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(id), mapper, type))
.log()
.map(MessageTo::toMessage);
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatRoomInfoTo infoTo)
+ Path chatroomPath(UUID id)
{
- return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(id.toString() + ".json"));
}
}
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
-import java.util.List;
-
@AllArgsConstructor
@NoArgsConstructor
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "name" })
+@ToString(of = { "id", "shard", "name" })
@Document
public class ChatRoomTo
{
@Id
private String id;
+ private Integer shard;
private String name;
- private List<MessageTo> messages;
- public static ChatRoomTo from(
- ChatRoomInfo chatRoomInfo,
- ChatRoomData chatRoomData)
+ public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
chatRoomInfo.getId().toString(),
- chatRoomInfo.getName(),
- chatRoomData
- .getMessages()
- .map(MessageTo::from)
- .collectList()
- .block());
+ chatRoomInfo.getShard(),
+ chatRoomInfo.getName());
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
private final ChatRoomRepository repository;
private final Clock clock;
private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
+ @Override
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
+ {
+ chatRoomInfoFlux
+ .map(ChatRoomTo::from)
+ .subscribe(chatroomTo -> repository.save(chatroomTo));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ return Flux
+ .fromIterable(repository.findAll())
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ return new ChatRoomData(
+ clock,
+ factory.create(
+ Flux
+ .fromIterable(chatRoomTo.getMessages())
+ .map(messageTo -> messageTo.toMessage())),
+ bufferSize);
+ });
+ }
+
@Override
public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
{