feat: Added a POST-mapping to force the storing of the data
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / InMemoryPersistenceStrategy.java
1 package de.juplo.kafka.chat.backend.persistence;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
6 import lombok.extern.slf4j.Slf4j;
7 import reactor.core.publisher.Flux;
8 import reactor.core.publisher.Mono;
9
10 import java.time.LocalDateTime;
11 import java.util.LinkedHashMap;
12
13
14 @Slf4j
15 public class InMemoryPersistenceStrategy implements PersistenceStrategy
16 {
17   private final LinkedHashMap<Message.MessageKey, Message> messages;
18
19
20   public InMemoryPersistenceStrategy(LinkedHashMap<Message.MessageKey, Message> messages)
21   {
22     this.messages = messages;
23   }
24
25
26   @Override
27   public Mono<Message> persistMessage(
28       Message.MessageKey key,
29       LocalDateTime timestamp,
30       String text)
31   {
32     Message message = new Message(key, (long)messages.size(), timestamp, text);
33
34     Message existing = messages.get(key);
35     if (existing != null)
36     {
37       log.info("Message with key {} already exists; {}", key, existing);
38       if (!message.equals(existing))
39         throw new MessageMutationException(message, existing);
40       return Mono.empty();
41     }
42
43     messages.put(key, message);
44     return Mono
45         .fromSupplier(() -> message)
46         .log();
47   }
48
49   @Override
50   public Mono<Message> getMessage(Message.MessageKey key)
51   {
52     return Mono.fromSupplier(() -> messages.get(key));
53   }
54
55   @Override
56   public Flux<Message> getMessages(long first, long last)
57   {
58     return Flux.fromStream(messages
59         .values()
60         .stream()
61         .filter(message ->
62         {
63           long serial = message.getSerialNumber();
64           return serial >= first && serial <= last;
65         }));
66   }
67 }