From: Kai Moritz Date: Sun, 8 Jan 2023 09:10:34 +0000 (+0100) Subject: refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Move X-Git-Tag: wip~75 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ae2a35f830ce23503deae96600b7c718983792a3;p=demos%2Fkafka%2Fchat refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Move --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomService.java new file mode 100644 index 00000000..d3a8364d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatroomService.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +public interface PersistenceStrategy +{ + Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text); + + Mono getMessage(Message.MessageKey key); + + Flux getMessages(long first, long last); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java deleted file mode 100644 index d3a8364d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/PersistenceStrategy.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -public interface PersistenceStrategy -{ - Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text); - - Mono getMessage(Message.MessageKey key); - - Flux getMessages(long first, long last); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomService.java new file mode 100644 index 00000000..4b522a82 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomService.java @@ -0,0 +1,76 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +public class InMemoryPersistenceStrategy implements PersistenceStrategy +{ + private final LinkedHashMap messages; + + + public InMemoryPersistenceStrategy(LinkedHashMap messages) + { + this.messages = messages; + } + + public InMemoryPersistenceStrategy(Flux messageFlux) + { + log.debug("Creating InMemoryPersistenceStrategy"); + messages = new LinkedHashMap<>(); + messageFlux.subscribe(message -> persistMessage(message)); + } + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + Message message = new Message(key, (long)messages.size(), timestamp, text); + return Mono.justOrEmpty(persistMessage(message)); + } + + private Message persistMessage(Message message) + { + Message.MessageKey key = message.getKey(); + Message existing = messages.get(key); + if (existing != null) + { + log.info("Message with key {} already exists; {}", key, existing); + if (!message.equals(existing)) + throw new MessageMutationException(message, existing); + return null; + } + + messages.put(key, message); + return message; + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java deleted file mode 100644 index 4b522a82..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryPersistenceStrategy.java +++ /dev/null @@ -1,76 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; -import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.LinkedHashMap; - - -@Slf4j -public class InMemoryPersistenceStrategy implements PersistenceStrategy -{ - private final LinkedHashMap messages; - - - public InMemoryPersistenceStrategy(LinkedHashMap messages) - { - this.messages = messages; - } - - public InMemoryPersistenceStrategy(Flux messageFlux) - { - log.debug("Creating InMemoryPersistenceStrategy"); - messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> persistMessage(message)); - } - - @Override - public Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - Message message = new Message(key, (long)messages.size(), timestamp, text); - return Mono.justOrEmpty(persistMessage(message)); - } - - private Message persistMessage(Message message) - { - Message.MessageKey key = message.getKey(); - Message existing = messages.get(key); - if (existing != null) - { - log.info("Message with key {} already exists; {}", key, existing); - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return null; - } - - messages.put(key, message); - return message; - } - - @Override - public Mono getMessage(Message.MessageKey key) - { - return Mono.fromSupplier(() -> messages.get(key)); - } - - @Override - public Flux getMessages(long first, long last) - { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); - } -}