refactor: RED - Refined success/error-handling for restore-operations
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / inmemory / InMemoryChatMessageService.java
1 package de.juplo.kafka.chat.backend.implementation.inmemory;
2
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.time.LocalDateTime;
10 import java.util.LinkedHashMap;
11
12
13 @Slf4j
14 public class InMemoryChatMessageService implements ChatMessageService
15 {
16   private final LinkedHashMap<Message.MessageKey, Message> messages;
17
18
19   public InMemoryChatMessageService(Flux<Message> messageFlux)
20   {
21     log.debug("Creating InMemoryChatMessageService");
22     messages = new LinkedHashMap<>();
23     messageFlux
24         .doOnNext(message -> messages.put(message.getKey(), message))
25         .then()
26         .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService"))
27         .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService"))
28         .block();
29   }
30
31   @Override
32   public Mono<Message> persistMessage(
33       Message.MessageKey key,
34       LocalDateTime timestamp,
35       String text)
36   {
37     Message message = new Message(key, (long)messages.size(), timestamp, text);
38     messages.put(message.getKey(), message);
39     return Mono.just(message);
40   }
41
42   @Override
43   public Mono<Message> getMessage(Message.MessageKey key)
44   {
45     return Mono.fromSupplier(() -> messages.get(key));
46   }
47
48   @Override
49   public Flux<Message> getMessages(long first, long last)
50   {
51     return Flux.fromStream(messages
52         .values()
53         .stream()
54         .filter(message ->
55         {
56           long serial = message.getSerialNumber();
57           return serial >= first && serial <= last;
58         }));
59   }
60 }