From: Kai Moritz Date: Wed, 28 Dec 2022 16:53:46 +0000 (+0100) Subject: refactor: Moved persistence-logic into a pluggable strategy X-Git-Tag: wip~91 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0daa9d1a4d6f6b95ffdb42fcf1350ff26a9166e0;p=demos%2Fkafka%2Fchat refactor: Moved persistence-logic into a pluggable strategy --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 437ff1f7..17d2b1f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.Chatroom; +import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; import lombok.RequiredArgsConstructor; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; @@ -20,13 +21,14 @@ import java.util.UUID; public class ChatBackendController { private final Map chatrooms = new HashMap<>(); + private final PersistenceStrategy persistenceStrategy; private final Clock clock; @PostMapping("create") public Chatroom create(@RequestBody String name) { - Chatroom chatroom = new Chatroom(UUID.randomUUID(), name); + Chatroom chatroom = new Chatroom(UUID.randomUUID(), name, persistenceStrategy); chatrooms.put(chatroom.getId(), chatroom); return chatroom; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java index df6794aa..c05fda0a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java @@ -19,7 +19,7 @@ public class Chatroom private final UUID id; @Getter private final String name; - private final LinkedHashMap messages = new LinkedHashMap<>(); + private final PersistenceStrategy persistence; private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); synchronized public Mono addMessage( @@ -28,41 +28,15 @@ public class Chatroom String user, String text) { - return persistMessage(id, timestamp, user, text) + return persistence + .persistMessage(Message.MessageKey.of(user, id), timestamp, text) .doOnNext(message -> sink.tryEmitNext(message).orThrow()); } - private Mono persistMessage( - Long id, - LocalDateTime timestamp, - String user, - String text) - { - Message.MessageKey key = Message.MessageKey.of(user, id); - Message message = new Message(key, (long)messages.size(), timestamp, text); - - Message existing = messages.get(key); - if (existing != null) - { - log.info("Message with key {} already exists; {}", key, existing); - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return Mono.empty(); - } - - messages.put(key, message); - return Mono - .fromSupplier(() -> message) - .log(); - } public Mono getMessage(String username, Long messageId) { - return Mono.fromSupplier(() -> - { - Message.MessageKey key = Message.MessageKey.of(username, messageId); - return messages.get(key); - }); + return persistence.getMessage(Message.MessageKey.of(username, messageId)); } public Flux listen() @@ -72,13 +46,6 @@ public class Chatroom public Flux getMessages(long first, long last) { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); + return persistence.getMessages(first, last); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java new file mode 100644 index 00000000..452a62d9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.Value; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +public interface PersistenceStrategy +{ + Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text); + + Mono getMessage(Message.MessageKey key); + + Flux getMessages(long first, long last); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java new file mode 100644 index 00000000..f6c76fa9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java @@ -0,0 +1,69 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.UUID; +import java.util.stream.Stream; + + +@Component +@RequiredArgsConstructor +@Slf4j +public class InMemoryPersistenceStrategy implements PersistenceStrategy +{ + private final LinkedHashMap messages = new LinkedHashMap<>(); + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + Message message = new Message(key, (long)messages.size(), timestamp, text); + + Message existing = messages.get(key); + if (existing != null) + { + log.info("Message with key {} already exists; {}", key, existing); + if (!message.equals(existing)) + throw new MessageMutationException(message, existing); + return Mono.empty(); + } + + messages.put(key, message); + return Mono + .fromSupplier(() -> message) + .log(); + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +}