From: Kai Moritz Date: Sun, 3 Sep 2023 20:52:19 +0000 (+0200) Subject: WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code X-Git-Tag: rebase--2023-09-05--23-53~25 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=425cd2f4fae570f6104bee5b6297a22b5fb63a0c;p=demos%2Fkafka%2Fchat WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index cf44802f..0031bb0d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -8,6 +9,7 @@ import reactor.core.publisher.SynchronousSink; import java.time.Clock; import java.time.LocalDateTime; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -17,6 +19,8 @@ public class ChatRoomData { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); + @Getter + private final UUID id; private final ChatRoomService service; private final Clock clock; private final int bufferSize; @@ -24,11 +28,13 @@ public class ChatRoomData public ChatRoomData( + UUID id, ChatRoomService service, Clock clock, int bufferSize) { - log.info("Created ChatRoom with buffer-size {}", bufferSize); + log.info("Created ChatRoom {id} with buffer-size {}", id, bufferSize); + this.id = id; this.service = service; this.clock = clock; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 860d9ffb..bcad3f37 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ChatRoomData; -import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; @@ -33,13 +33,12 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; - private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @Override - public void writeChatRoomData(Flux chatRoomDataFlux) + public void writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -52,7 +51,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatRoomDataFlux + chatRoomInfoFlux .log() .doFirst(() -> { @@ -84,7 +83,6 @@ public class FilesStorageStrategy implements StorageStrategy { ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); generator.writeObject(infoTo); - writeMessages(infoTo, chatroom.getMessages()); } catch (IOException e) { @@ -98,6 +96,69 @@ public class FilesStorageStrategy implements StorageStrategy } } + @Override + public Flux readChatRoomInfo() + { + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); + return Flux + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .log() + .map(infoTo -> new ChatRoomInfo( + infoTo.getId(), + infoTo.getName(), + infoTo.getShard())); + } + + @Override + public void writeChatRoomData(Flux chatRoomDataFlux) + { + Path path = chatroomsPath(); + log.info("Writing chatrooms to {}", path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + chatRoomDataFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(chatRoomData -> writeMessages( + chatRoomData.getId(), + chatRoomData.getMessages())); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + @Override public Flux readChatRoomData() { @@ -108,18 +169,18 @@ public class FilesStorageStrategy implements StorageStrategy .map(infoTo -> { UUID chatRoomId = infoTo.getId(); - int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoomData( + chatRoomId, + factory.create(readMessages(chatRoomId)), clock, - factory.create(readMessages(infoTo)), bufferSize); }); } - public void writeMessages(ChatRoomInfoTo infoTo, Flux messageFlux) + public void writeMessages(UUID id, Flux messageFlux) { - Path path = chatroomPath(infoTo); - log.info("Writing messages for {} to {}", infoTo, path); + Path path = chatroomPath(id); + log.info("Writing messages for {} to {}", id, path); try { Files.createDirectories(storagePath); @@ -174,11 +235,11 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(ChatRoomInfoTo infoTo) + public Flux readMessages(UUID id) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(infoTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(id), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -188,8 +249,8 @@ public class FilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomInfoTo infoTo) + Path chatroomPath(UUID id) { - return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(id.toString() + ".json")); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index f3fae327..58f5bce5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -6,34 +6,26 @@ import lombok.*; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; -import java.util.List; - @AllArgsConstructor @NoArgsConstructor @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) @EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "name" }) +@ToString(of = { "id", "shard", "name" }) @Document public class ChatRoomTo { @Id private String id; + private Integer shard; private String name; - private List messages; - public static ChatRoomTo from( - ChatRoomInfo chatRoomInfo, - ChatRoomData chatRoomData) + public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( chatRoomInfo.getId().toString(), - chatRoomInfo.getName(), - chatRoomData - .getMessages() - .map(MessageTo::from) - .collectList() - .block()); + chatRoomInfo.getShard(), + chatRoomInfo.getName()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 7952a2b2..772c6e42 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -19,10 +20,35 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; - private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; + @Override + public void writeChatRoomInfo(Flux chatRoomInfoFlux) + { + chatRoomInfoFlux + .map(ChatRoomTo::from) + .subscribe(chatroomTo -> repository.save(chatroomTo)); + } + + @Override + public Flux readChatRoomInfo() + { + return Flux + .fromIterable(repository.findAll()) + .map(chatRoomTo -> + { + UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + return new ChatRoomData( + clock, + factory.create( + Flux + .fromIterable(chatRoomTo.getMessages()) + .map(messageTo -> messageTo.toMessage())), + bufferSize); + }); + } + @Override public void writeChatRoomData(Flux chatRoomDataFlux) {