refactore: Renamed `PersistenceStrategy` to `ChatroomService` -- Rename
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryChatroomService.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.ChatroomService;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12
13
14 @Slf4j
15 public class InMemoryChatroomService implements ChatroomService
16 {
17   private final LinkedHashMap<Message.MessageKey, Message> messages;
18
19
20   public InMemoryChatroomService(LinkedHashMap<Message.MessageKey, Message> messages)
21   {
22     this.messages = messages;
23   }
24
25   public InMemoryChatroomService(Flux<Message> messageFlux)
26   {
27     log.debug("Creating InMemoryChatroomService");
28     messages = new LinkedHashMap<>();
29     messageFlux.subscribe(message -> persistMessage(message));
30   }
31
32   @Override
33   public Mono<Message> persistMessage(
34       Message.MessageKey key,
35       LocalDateTime timestamp,
36       String text)
37   {
38     Message message = new Message(key, (long)messages.size(), timestamp, text);
39     return Mono.justOrEmpty(persistMessage(message));
40   }
41
42   private Message persistMessage(Message message)
43   {
44     Message.MessageKey key = message.getKey();
45     Message existing = messages.get(key);
46     if (existing != null)
47     {
48       log.info("Message with key {} already exists; {}", key, existing);
49       if (!message.equals(existing))
50         throw new MessageMutationException(message, existing);
51       return null;
52     }
53
54     messages.put(key, message);
55     return message;
56   }
57
58   @Override
59   public Mono<Message> getMessage(Message.MessageKey key)
60   {
61     return Mono.fromSupplier(() -> messages.get(key));
62   }
63
64   @Override
65   public Flux<Message> getMessages(long first, long last)
66   {
67     return Flux.fromStream(messages
68         .values()
69         .stream()
70         .filter(message ->
71         {
72           long serial = message.getSerialNumber();
73           return serial >= first && serial <= last;
74         }));
75   }
76 }