From: Kai Moritz Date: Mon, 11 Sep 2023 14:18:42 +0000 (+0200) Subject: refactor: Renamed `ChatRoomService` into `ChatMessageService` -- MOVE X-Git-Tag: rebase--2024-02-03--15-10~26 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=518646c28916f389307c0e6ec38cf11ef897fbf3;p=demos%2Fkafka%2Fchat refactor: Renamed `ChatRoomService` into `ChatMessageService` -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java new file mode 100644 index 00000000..374a442b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatMessageService.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +public interface ChatRoomService +{ + Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text); + + Mono getMessage(Message.MessageKey key); + + Flux getMessages(long first, long last); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java deleted file mode 100644 index 374a442b..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -public interface ChatRoomService -{ - Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text); - - Mono getMessage(Message.MessageKey key); - - Flux getMessages(long first, long last); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java new file mode 100644 index 00000000..a4fcec8c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -0,0 +1,55 @@ +package de.juplo.kafka.chat.backend.implementation.inmemory; + +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +public class InMemoryChatRoomService implements ChatRoomService +{ + private final LinkedHashMap messages; + + + public InMemoryChatRoomService(Flux messageFlux) + { + log.debug("Creating InMemoryChatRoomService"); + messages = new LinkedHashMap<>(); + messageFlux.subscribe(message -> messages.put(message.getKey(), message)); + } + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + Message message = new Message(key, (long)messages.size(), timestamp, text); + messages.put(message.getKey(), message); + return Mono.just(message); + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java deleted file mode 100644 index a4fcec8c..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.inmemory; - -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.LinkedHashMap; - - -@Slf4j -public class InMemoryChatRoomService implements ChatRoomService -{ - private final LinkedHashMap messages; - - - public InMemoryChatRoomService(Flux messageFlux) - { - log.debug("Creating InMemoryChatRoomService"); - messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> messages.put(message.getKey(), message)); - } - - @Override - public Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - Message message = new Message(key, (long)messages.size(), timestamp, text); - messages.put(message.getKey(), message); - return Mono.just(message); - } - - @Override - public Mono getMessage(Message.MessageKey key) - { - return Mono.fromSupplier(() -> messages.get(key)); - } - - @Override - public Flux getMessages(long first, long last) - { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java new file mode 100644 index 00000000..084798c7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java @@ -0,0 +1,58 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatRoomService implements ChatRoomService +{ + private final ChatRoomChannel chatRoomChannel; + private final UUID chatRoomId; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + + @Override + public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return chatRoomChannel + .sendChatMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); + } + + void persistMessage(Message message) + { + messages.put (message.getKey(), message); + } + + @Override + synchronized public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + synchronized public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java deleted file mode 100644 index 084798c7..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java +++ /dev/null @@ -1,58 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; -import java.util.LinkedHashMap; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatRoomService implements ChatRoomService -{ - private final ChatRoomChannel chatRoomChannel; - private final UUID chatRoomId; - - private final LinkedHashMap messages = new LinkedHashMap<>(); - - - @Override - public Mono persistMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - return chatRoomChannel - .sendChatMessage(chatRoomId, key, timestamp, text) - .doOnSuccess(message -> persistMessage(message)); - } - - void persistMessage(Message message) - { - messages.put (message.getKey(), message); - } - - @Override - synchronized public Mono getMessage(Message.MessageKey key) - { - return Mono.fromSupplier(() -> messages.get(key)); - } - - @Override - synchronized public Flux getMessages(long first, long last) - { - return Flux.fromStream(messages - .values() - .stream() - .filter(message -> - { - long serial = message.getSerialNumber(); - return serial >= first && serial <= last; - })); - } -}