refactor: Moved persistence-logic into a pluggable strategy
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryPersistenceStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
6 import lombok.Getter;
7 import lombok.RequiredArgsConstructor;
8 import lombok.Value;
9 import lombok.extern.slf4j.Slf4j;
10 import org.springframework.stereotype.Component;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13 import reactor.core.publisher.Sinks;
14
15 import java.time.LocalDateTime;
16 import java.util.LinkedHashMap;
17 import java.util.UUID;
18 import java.util.stream.Stream;
19
20
21 @Component
22 @RequiredArgsConstructor
23 @Slf4j
24 public class InMemoryPersistenceStrategy implements PersistenceStrategy
25 {
26   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
27
28   @Override
29   public Mono<Message> persistMessage(
30       Message.MessageKey key,
31       LocalDateTime timestamp,
32       String text)
33   {
34     Message message = new Message(key, (long)messages.size(), timestamp, text);
35
36     Message existing = messages.get(key);
37     if (existing != null)
38     {
39       log.info("Message with key {} already exists; {}", key, existing);
40       if (!message.equals(existing))
41         throw new MessageMutationException(message, existing);
42       return Mono.empty();
43     }
44
45     messages.put(key, message);
46     return Mono
47         .fromSupplier(() -> message)
48         .log();
49   }
50
51   @Override
52   public Mono<Message> getMessage(Message.MessageKey key)
53   {
54     return Mono.fromSupplier(() -> messages.get(key));
55   }
56
57   @Override
58   public Flux<Message> getMessages(long first, long last)
59   {
60     return Flux.fromStream(messages
61         .values()
62         .stream()
63         .filter(message ->
64         {
65           long serial = message.getSerialNumber();
66           return serial >= first && serial <= last;
67         }));
68   }
69 }