refactor: Moved configuration in a separate class and made it more explicit
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryPersistenceStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
13
14
15 @RequiredArgsConstructor
16 @Slf4j
17 public class InMemoryPersistenceStrategy implements PersistenceStrategy
18 {
19   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
20
21   @Override
22   public Mono<Message> persistMessage(
23       Message.MessageKey key,
24       LocalDateTime timestamp,
25       String text)
26   {
27     Message message = new Message(key, (long)messages.size(), timestamp, text);
28
29     Message existing = messages.get(key);
30     if (existing != null)
31     {
32       log.info("Message with key {} already exists; {}", key, existing);
33       if (!message.equals(existing))
34         throw new MessageMutationException(message, existing);
35       return Mono.empty();
36     }
37
38     messages.put(key, message);
39     return Mono
40         .fromSupplier(() -> message)
41         .log();
42   }
43
44   @Override
45   public Mono<Message> getMessage(Message.MessageKey key)
46   {
47     return Mono.fromSupplier(() -> messages.get(key));
48   }
49
50   @Override
51   public Flux<Message> getMessages(long first, long last)
52   {
53     return Flux.fromStream(messages
54         .values()
55         .stream()
56         .filter(message ->
57         {
58           long serial = message.getSerialNumber();
59           return serial >= first && serial <= last;
60         }));
61   }
62 }