7d4b9b62403c8565a0d8e356a2a81bf3f18f7f09
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / InMemoryChatMessageService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12 import java.util.UUID;
13
14
15 @Slf4j
16 public class InMemoryChatMessageService implements ChatMessageService
17 {
18   private final UUID chatRoomId;
19   private final LinkedHashMap<Message.MessageKey, Message> messages;
20
21
22   public InMemoryChatMessageService(UUID chatRoomId)
23   {
24     log.debug("Creating InMemoryChatMessageService");
25     this.chatRoomId = chatRoomId;
26     messages = new LinkedHashMap<>();
27   }
28
29
30   Mono<Void> restore(StorageStrategy storageStrategy)
31   {
32     Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
33
34     return messageFlux
35         .doOnNext(message -> messages.put(message.getKey(), message))
36         .then()
37         .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
38         .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"));
39   }
40
41   @Override
42   public Mono<Message> persistMessage(
43       Message.MessageKey key,
44       LocalDateTime timestamp,
45       String text)
46   {
47     Message message = new Message(key, (long)messages.size(), timestamp, text);
48     messages.put(message.getKey(), message);
49     return Mono.just(message);
50   }
51
52   @Override
53   public Mono<Message> getMessage(Message.MessageKey key)
54   {
55     return Mono.fromSupplier(() -> messages.get(key));
56   }
57
58   @Override
59   public Flux<Message> getMessages(long first, long last)
60   {
61     return Flux.fromStream(messages
62         .values()
63         .stream()
64         .filter(message ->
65         {
66           long serial = message.getSerialNumber();
67           return serial >= first && serial <= last;
68         }));
69   }
70 }