From: Kai Moritz Date: Sun, 3 Sep 2023 23:41:45 +0000 (+0200) Subject: refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code X-Git-Tag: rebase--2023-09-05--23-53~22 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=62bed2f66dfacc519531e14c3b2286863241946f;p=demos%2Fkafka%2Fchat refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index 9997b94e..c4ab80a4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,14 +1,16 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import reactor.core.publisher.Flux; +import java.util.UUID; + public interface StorageStrategy { void writeChatRoomInfo(Flux chatRoomInfoFlux); Flux readChatRoomInfo(); - void writeChatRoomData(Flux chatRoomDataFlux); - Flux readChatRoomData(); + void writeChatRoomData(UUID chatRoomId, Flux messageFlux); + Flux readChatRoomData(UUID chatRoomId); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index dd8f7d22..76080c9c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,10 +29,12 @@ public class InMemoryServicesConfiguration ChatHome noneShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, + ChatRoomServiceFactory chatRoomServiceFactory, Clock clock) { return new SimpleChatHome( - storageStrategy.readChatRoomData(), + storageStrategy, + chatRoomServiceFactory, clock, properties.getChatroomBufferSize()); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index f0067fa6..961cea2e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,6 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -21,23 +23,23 @@ public class SimpleChatHome implements ChatHome public SimpleChatHome( - Flux chatRoomInfoFlux, - ChatRoomService chatRoomService, + StorageStrategy storageStrategy, + ChatRoomServiceFactory chatRoomServiceFactory, Clock clock, int bufferSize) { this( null, - chatRoomInfoFlux, - chatRoomService, + storageStrategy, + chatRoomServiceFactory, clock, bufferSize); } public SimpleChatHome( Integer shard, - Flux chatRoomInfoFlux, - ChatRoomService chatRoomService, + StorageStrategy storageStrategy, + ChatRoomServiceFactory chatRoomServiceFactory, Clock clock, int bufferSize) { @@ -46,7 +48,8 @@ public class SimpleChatHome implements ChatHome this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); - chatRoomInfoFlux + storageStrategy + .readChatRoomInfo() .filter(info -> { if (shard == null || info.getShard() == shard) @@ -65,10 +68,17 @@ public class SimpleChatHome implements ChatHome .toStream() .forEach(info -> { - chatRoomInfo.put(info.getId(), info); + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); chatRoomData.put( info.getId(), - new ChatRoomData(chatRoomService, clock, bufferSize)); + new ChatRoomData( + chatRoomId, + chatRoomServiceFactory.create(messageFlux), + clock, + bufferSize)); }); this.clock = clock; this.bufferSize = bufferSize; @@ -81,7 +91,7 @@ public class SimpleChatHome implements ChatHome log.info("Creating ChatRoom with buffer-size {}", bufferSize); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); - ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize); + ChatRoomData chatRoomData = new ChatRoomData(id, service, clock, bufferSize); this.chatRoomData.put(id, chatRoomData); return Mono.just(chatRoomInfo); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index ca0f851b..b0bb7d86 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -79,12 +79,12 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(chatroom -> + .subscribe(chatRoomInfo -> { try { - ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); - generator.writeObject(infoTo); + ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); + generator.writeObject(chatRoomInfoTo); } catch (IOException e) { @@ -124,77 +124,12 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData(Flux chatRoomDataFlux) + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux) { - Path path = chatroomsPath(); - log.info("Writing chatrooms to {}", path); - try - { - Files.createDirectories(storagePath); - - JsonGenerator generator = - mapper - .getFactory() - .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - - chatRoomDataFlux - .log() - .doFirst(() -> - { - try - { - generator.useDefaultPrettyPrinter(); - generator.writeStartArray(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .doOnTerminate(() -> - { - try - { - generator.writeEndArray(); - generator.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - }) - .subscribe(chatRoomData -> writeMessages( - chatRoomData.getId(), - chatRoomData.getMessages())); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - @Override - public Flux readChatRoomData() - { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); - return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) - .log() - .map(infoTo -> - { - UUID chatRoomId = infoTo.getId(); - return new ChatRoomData( - chatRoomId, - factory.create(readMessages(chatRoomId)), - clock, - bufferSize); - }); - } - - public void writeMessages(UUID id, Flux messageFlux) - { - Path path = chatroomPath(id); - log.info("Writing messages for {} to {}", id, path); + Path path = chatroomPath(chatRoomId); + log.info("Writing messages for {} to {}", chatRoomId, path); try { Files.createDirectories(storagePath); @@ -249,11 +184,12 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(UUID id) + @Override + public Flux readChatRoomData(UUID chatRoomId) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(id), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) .log() .map(MessageTo::toMessage); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java index d80d5fe4..c2a7f0dc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java @@ -2,7 +2,10 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import org.springframework.data.mongodb.repository.MongoRepository; +import java.util.List; + public interface MessageRepository extends MongoRepository { + List findByChatRoomId(String chatRoomId); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 9ef38a71..c2b014e0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; +import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -21,15 +22,18 @@ public class MongoDbStorageConfiguration @Bean public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, + MessageRepository messageRepository, ChatBackendProperties properties, Clock clock, - ShardingStrategy shardingStrategy) + ShardingStrategy shardingStrategy, + ChatRoomServiceFactory chatRoomServiceFactory) { return new MongoDbStorageStrategy( chatRoomRepository, + messageRepository, clock, properties.getChatroomBufferSize(), shardingStrategy, - messageFlux -> new InMemoryChatRoomService(messageFlux)); + chatRoomServiceFactory); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 41b1d208..8a172c16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -57,29 +58,18 @@ public class MongoDbStorageStrategy implements StorageStrategy } @Override - public void writeChatRoomData(Flux chatRoomDataFlux) + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) { - chatRoomDataFlux - .flatMap(ChatRoomTo::from) - .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); + messageFlux + .map(message -> MessageTo.from(message)) + .subscribe(messageTo -> messageRepository.save(messageTo)); // TODO: Namespace } @Override - public Flux readChatRoomData() + public Flux readChatRoomData(UUID chatRoomId) { return Flux - .fromIterable(chatRoomRepository.findAll()) - .map(chatRoomTo -> - { - UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - return new ChatRoomData( - clock, - factory.create( - Flux - .fromIterable(chatRoomTo.getMessages()) - .map(messageTo -> messageTo.toMessage())), - bufferSize); - }); + .fromIterable(messageRepository.findByChatRoomId(chatRoomId.toString())) + .map(messageTo -> messageTo.toMessage()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java index 84ac8deb..ab24bb8a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.nostorage; -import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -11,6 +11,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import reactor.core.publisher.Flux; +import java.util.UUID; + @ConditionalOnProperty( prefix = "chat.backend.inmemory", @@ -39,10 +41,10 @@ public class NoStorageStorageConfiguration } @Override - public void writeChatRoomData(Flux chatRoomDataFlux) {} + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} @Override - public Flux readChatRoomData() + public Flux readChatRoomData(UUID chatRoomId) { return Flux.empty(); }