5d5feb87845cfbd536ff6f8364b87583b01d63cc
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / InMemoryChatMessageService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12 import java.util.UUID;
13
14
15 @Slf4j
16 public class InMemoryChatMessageService implements ChatMessageService
17 {
18   private final UUID chatRoomId;
19   private final LinkedHashMap<Message.MessageKey, Message> messages;
20
21
22   public InMemoryChatMessageService(UUID chatRoomId)
23   {
24     log.debug("Creating InMemoryChatMessageService");
25     this.chatRoomId = chatRoomId;
26     messages = new LinkedHashMap<>();
27   }
28
29
30   Mono<Void> restore(StorageStrategy storageStrategy)
31   {
32     Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
33
34     return messageFlux
35         .doOnNext(message -> messages.put(message.getKey(), message))
36         .count()
37         .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count))
38         .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
39         .then();
40   }
41
42   @Override
43   public Mono<Message> persistMessage(
44       Message.MessageKey key,
45       LocalDateTime timestamp,
46       String text)
47   {
48     Message message = new Message(key, (long)messages.size(), timestamp, text);
49     messages.put(message.getKey(), message);
50     return Mono.just(message);
51   }
52
53   @Override
54   public Mono<Message> getMessage(Message.MessageKey key)
55   {
56     return Mono.fromSupplier(() -> messages.get(key));
57   }
58
59   @Override
60   public Flux<Message> getMessages(long first, long last)
61   {
62     return Flux.fromStream(messages
63         .values()
64         .stream()
65         .filter(message ->
66         {
67           long serial = message.getSerialNumber();
68           return serial >= first && serial <= last;
69         }));
70   }
71 }