a9a76a590543b7188999c35835ce0bfe1a620027
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / InMemoryChatMessageService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
6 import lombok.Getter;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
13 import java.util.UUID;
14
15
16 @Slf4j
17 public class InMemoryChatMessageService implements ChatMessageService
18 {
19   @Getter
20   private final UUID chatRoomId;
21   private final LinkedHashMap<Message.MessageKey, Message> messages;
22
23
24   public InMemoryChatMessageService(UUID chatRoomId)
25   {
26     log.debug("Creating InMemoryChatMessageService");
27     this.chatRoomId = chatRoomId;
28     messages = new LinkedHashMap<>();
29   }
30
31
32   Mono<Void> restore(StorageStrategy storageStrategy)
33   {
34     Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
35
36     return messageFlux
37         .doOnNext(message -> messages.put(message.getKey(), message))
38         .count()
39         .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count))
40         .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
41         .then();
42   }
43
44   @Override
45   public Mono<Message> persistMessage(
46       Message.MessageKey key,
47       LocalDateTime timestamp,
48       String text)
49   {
50     Message message = new Message(key, (long)messages.size(), timestamp, text);
51     messages.put(message.getKey(), message);
52     return Mono.just(message);
53   }
54
55   @Override
56   public Mono<Message> getMessage(Message.MessageKey key)
57   {
58     return Mono.fromSupplier(() -> messages.get(key));
59   }
60
61   @Override
62   public Flux<Message> getMessages(long first, long last)
63   {
64     return Flux.fromStream(messages
65         .values()
66         .stream()
67         .filter(message ->
68         {
69           long serial = message.getSerialNumber();
70           return serial >= first && serial <= last;
71         }));
72   }
73 }