X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2FInMemoryChatRoomService.java;h=f789d35906f797665dc7ccd8ec3d6fc3cecfc8c0;hb=af3553a0f5093819fc2af088b974375c7102ed9d;hp=1831037ebd208365e1e8308a02ae187b381349c3;hpb=cfda873368d7b3fdb4869fbce98a0d6e8ca69ab7;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java index 1831037e..f789d359 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; import de.juplo.kafka.chat.backend.domain.ChatRoomService; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -24,34 +23,19 @@ public class InMemoryChatRoomService implements ChatRoomService public InMemoryChatRoomService(Flux messageFlux) { - log.debug("Creating InMemoryChatroomService"); + log.debug("Creating InMemoryChatRoomService"); messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> persistMessage(message)); + messageFlux.subscribe(message -> messages.put(message.getKey(), message)); } @Override - public Mono persistMessage( + public Message persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text) { Message message = new Message(key, (long)messages.size(), timestamp, text); - return Mono.justOrEmpty(persistMessage(message)); - } - - private Message persistMessage(Message message) - { - Message.MessageKey key = message.getKey(); - Message existing = messages.get(key); - if (existing != null) - { - log.info("Message with key {} already exists; {}", key, existing); - if (!message.equals(existing)) - throw new MessageMutationException(message, existing); - return null; - } - - messages.put(key, message); + messages.put(message.getKey(), message); return message; }