refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / inmemory / InMemoryChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.time.LocalDateTime;
10 import java.util.LinkedHashMap;
11
12
13 @Slf4j
14 public class InMemoryChatRoomService implements ChatRoomService
15 {
16   private final LinkedHashMap<Message.MessageKey, Message> messages;
17
18
19   public InMemoryChatRoomService(Flux<Message> messageFlux)
20   {
21     log.debug("Creating InMemoryChatRoomService");
22     messages = new LinkedHashMap<>();
23     messageFlux.subscribe(message -> messages.put(message.getKey(), message));
24   }
25
26   @Override
27   public Mono<Message> persistMessage(
28       Message.MessageKey key,
29       LocalDateTime timestamp,
30       String text)
31   {
32     Message message = new Message(key, (long)messages.size(), timestamp, text);
33     messages.put(message.getKey(), message);
34     return Mono.just(message);
35   }
36
37   @Override
38   public Mono<Message> getMessage(Message.MessageKey key)
39   {
40     return Mono.fromSupplier(() -> messages.get(key));
41   }
42
43   @Override
44   public Flux<Message> getMessages(long first, long last)
45   {
46     return Flux.fromStream(messages
47         .values()
48         .stream()
49         .filter(message ->
50         {
51           long serial = message.getSerialNumber();
52           return serial >= first && serial <= last;
53         }));
54   }
55 }