From: Kai Moritz Date: Thu, 5 Jan 2023 15:09:04 +0000 (+0100) Subject: feat: The chats are stored as local json-files X-Git-Tag: TEST~56 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0efe7c3edff3a8c4aea6cb2d550a6f4ac08f3a55;p=demos%2Fkafka%2Fchat feat: The chats are stored as local json-files --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 7f6f213f..37eeeee4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -1,15 +1,29 @@ package de.juplo.kafka.chat.backend; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import jakarta.annotation.PreDestroy; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; - -import java.time.Clock; +import reactor.core.publisher.Flux; @SpringBootApplication public class ChatBackendApplication { + @Autowired + ChatHome chatHome; + @Autowired + StorageStrategy storageStrategy; + + @PreDestroy + public void onExit() + { + storageStrategy.writeChatrooms(Flux.fromIterable(chatHome.list())); + } + public static void main(String[] args) { SpringApplication.run(ChatBackendApplication.class, args); diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index 835627a0..6387c6ef 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -1,22 +1,46 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ChatHome; +import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.domain.ChatroomFactory; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.InMemoryChatroomFactory; import de.juplo.kafka.chat.backend.persistence.InMemoryPersistenceStrategy; +import de.juplo.kafka.chat.backend.persistence.LocalJsonFilesStorageStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.nio.file.Paths; import java.time.Clock; +import java.util.LinkedHashMap; @Configuration +@EnableConfigurationProperties(ChatBackendProperties.class) public class ChatBackendConfiguration { @Bean - public ChatHome chatHome(ChatroomFactory chatroomFactory) + public ChatHome chatHome( + ChatroomFactory chatroomFactory, + StorageStrategy storageStrategy) { - return new ChatHome(chatroomFactory); + return new ChatHome( + storageStrategy.readChatrooms().collectMap(chatroom -> chatroom.getId()).block(), + chatroomFactory); + } + + @Bean + public StorageStrategy storageStrategy( + ChatBackendProperties properties, + ObjectMapper mapper, + ChatroomFactory chatroomFactory) + { + return new LocalJsonFilesStorageStrategy( + Paths.get(properties.getDatadir()), + mapper, + chatroomFactory); } @Bean @@ -28,7 +52,7 @@ public class ChatBackendConfiguration @Bean InMemoryPersistenceStrategy persistenceStrategy() { - return new InMemoryPersistenceStrategy(); + return new InMemoryPersistenceStrategy(new LinkedHashMap<>()); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java new file mode 100644 index 00000000..598e8ce0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.chat.backend; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.nio.file.Paths; + + +@ConfigurationProperties("chat.backend") +@Getter +@Setter +public class ChatBackendProperties +{ + private String datadir = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java index 53d2319a..3525d563 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/MessageTo.java @@ -17,6 +17,11 @@ public class MessageTo private String user; private String text; + public Message toMessage() + { + return new Message(Message.MessageKey.of(user, id), serial, time, text); + } + public static MessageTo from(Message message) { return diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 84b9e946..2b5152c9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,7 +8,7 @@ import java.util.*; @RequiredArgsConstructor public class ChatHome { - private final Map chatrooms = new HashMap<>(); + private final Map chatrooms; private final ChatroomFactory factory; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java index c05fda0a..8f4a7975 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java @@ -44,6 +44,11 @@ public class Chatroom return sink.asFlux(); } + public Flux getMessages() + { + return getMessages(0, Long.MAX_VALUE); + } + public Flux getMessages(long first, long last) { return persistence.getMessages(first, last); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomFactory.java index 02ce3d27..f5e8c19b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomFactory.java @@ -3,7 +3,8 @@ package de.juplo.kafka.chat.backend.domain; import java.util.UUID; -public interface ChatroomFactory +public interface ChatroomFactory { Chatroom createChatroom(UUID id, String name); + Chatroom restoreChatroom(UUID id, String name, Strategy strategy); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java index 452a62d9..d3a8364d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.Value; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java new file mode 100644 index 00000000..6536175d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/ChatroomInfo.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.Chatroom; +import lombok.Data; + +import java.util.UUID; + +@Data +public class ChatroomInfo +{ + private UUID id; + private String name; + + + public static ChatroomInfo from(Chatroom chatroom) + { + ChatroomInfo info = new ChatroomInfo(); + info.id = chatroom.getId(); + info.name = chatroom.getName(); + return info; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java index b2b7f1df..231eb37c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java @@ -7,8 +7,9 @@ import lombok.RequiredArgsConstructor; import java.util.UUID; + @RequiredArgsConstructor -public class InMemoryChatroomFactory implements ChatroomFactory +public class InMemoryChatroomFactory implements ChatroomFactory { private final PersistenceStrategy persistenceStrategy; @@ -18,4 +19,13 @@ public class InMemoryChatroomFactory implements ChatroomFactory { return new Chatroom(id, name, persistenceStrategy); } + + @Override + public Chatroom restoreChatroom( + UUID id, + String name, + InMemoryPersistenceStrategy persistenceStrategy) + { + return new Chatroom(id, name, persistenceStrategy); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java index 454419ed..19ef3431 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.MessageMutationException; import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -12,11 +11,17 @@ import java.time.LocalDateTime; import java.util.LinkedHashMap; -@RequiredArgsConstructor @Slf4j public class InMemoryPersistenceStrategy implements PersistenceStrategy { - private final LinkedHashMap messages = new LinkedHashMap<>(); + private final LinkedHashMap messages; + + + public InMemoryPersistenceStrategy(LinkedHashMap messages) + { + this.messages = messages; + } + @Override public Mono persistMessage( diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java new file mode 100644 index 00000000..40fa2bc4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java @@ -0,0 +1,270 @@ +package de.juplo.kafka.chat.backend.persistence; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.ChatroomFactory; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.LinkedHashMap; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; + + +@RequiredArgsConstructor +@Slf4j +public class LocalJsonFilesStorageStrategy implements StorageStrategy +{ + public static final String CHATROOMS_FILENAME = "chatrooms.json"; + + + private final Path storagePath; + private final ObjectMapper mapper; + private final ChatroomFactory chatroomFactory; + + + @Override + public void writeChatrooms(Flux chatroomFlux) + { + Path path = chatroomsPath(); + log.info("Writing chatrooms to {}", path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + chatroomFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(chatroom -> + { + try + { + ChatroomInfo chatroomInfo = ChatroomInfo.from(chatroom); + generator.writeObject(chatroomInfo); + writeMessages(chatroomInfo, chatroom.getMessages()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readChatrooms() + { + Path path = chatroomsPath(); + log.info("Reading chatrooms from {}", path); + try + { + JsonParser parser = + mapper + .getFactory() + .createParser(Files.newBufferedReader(path)); + + if (parser.nextToken() != JsonToken.START_ARRAY) + throw new IllegalStateException("Expected content to be an array"); + + Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); + + while (parser.nextToken() != JsonToken.END_ARRAY) + { + many + .tryEmitNext(mapper.readValue(parser, ChatroomInfo.class)) + .orThrow(); + } + + many.tryEmitComplete().orThrow(); + + return many + .asFlux() + .map(chatroomInfo -> + { + LinkedHashMap messages = + readMessages(chatroomInfo) + .collect(Collectors.toMap( + Message::getKey, + Function.identity(), + (existing, message) -> + { + if (!message.equals(existing)) + throw new MessageMutationException(message, existing); + return existing; + }, + LinkedHashMap::new)) + .block(); + InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages); + return chatroomFactory.restoreChatroom(chatroomInfo.getId(), chatroomInfo.getName(), strategy); + }); + } + catch (NoSuchFileException e) + { + log.info("{} does not exist - starting with empty ChatHome", path); + return Flux.empty(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void writeMessages(ChatroomInfo chatroomInfo, Flux messageFlux) + { + Path path = chatroomPath(chatroomInfo); + log.info("Writing messages for {} to {}", chatroomInfo, path); + try + { + Files.createDirectories(storagePath); + + JsonGenerator generator = + mapper + .getFactory() + .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); + + messageFlux + .log() + .doFirst(() -> + { + try + { + generator.useDefaultPrettyPrinter(); + generator.writeStartArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .doOnTerminate(() -> + { + try + { + generator.writeEndArray(); + generator.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }) + .subscribe(message -> + { + try + { + MessageTo messageTo = MessageTo.from(message); + generator.writeObject(messageTo); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + }); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + @Override + public Flux readMessages(ChatroomInfo chatroomInfo) + { + Path path = chatroomPath(chatroomInfo); + log.info("Reading messages for {} from {}", chatroomInfo, path); + try + { + JsonParser parser = + mapper + .getFactory() + .createParser(Files.newBufferedReader(path)); + + if (parser.nextToken() != JsonToken.START_ARRAY) + throw new IllegalStateException("Expected content to be an array"); + + Sinks.Many many = Sinks.many().unicast().onBackpressureBuffer(); + + while (parser.nextToken() != JsonToken.END_ARRAY) + { + many + .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage()) + .orThrow(); + } + + many.tryEmitComplete().orThrow(); + + return many.asFlux(); + } + catch (NoSuchFileException e) + { + log.info( + "{} does not exist - starting with empty chat for {}", + path, + chatroomInfo); + return Flux.empty(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + Path chatroomsPath() + { + return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); + } + + Path chatroomPath(ChatroomInfo chatroomInfo) + { + return storagePath.resolve(Path.of(chatroomInfo.getId().toString() + ".json")); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java new file mode 100644 index 00000000..3c268ca6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.Message; +import reactor.core.publisher.Flux; + + +public interface StorageStrategy +{ + void writeChatrooms(Flux chatroomFlux); + Flux readChatrooms(); + void writeMessages(ChatroomInfo chatroomInfo, Flux messageFlux); + Flux readMessages(ChatroomInfo chatroomInfo); +}