TMP:test:FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / KafkaChatMessageService.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatMessageService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.Getter;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import reactor.core.publisher.Flux;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.util.LinkedHashMap;
13 import java.util.UUID;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 public class KafkaChatMessageService implements ChatMessageService
19 {
20   private final DataChannel dataChannel;
21   @Getter
22   private final UUID chatRoomId;
23
24   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
25
26
27   @Override
28   public Mono<Message> persistMessage(
29     Message.MessageKey key,
30     LocalDateTime timestamp,
31     String text)
32   {
33     return dataChannel
34         .sendChatMessage(chatRoomId, key, timestamp, text)
35         .doOnSuccess(message -> persistMessage(message));
36   }
37
38   void persistMessage(Message message)
39   {
40     messages.put  (message.getKey(), message);
41   }
42
43   @Override
44   synchronized public Mono<Message> getMessage(Message.MessageKey key)
45   {
46     return Mono.fromSupplier(() -> messages.get(key));
47   }
48
49   @Override
50   synchronized public Flux<Message> getMessages(long first, long last)
51   {
52     return Flux.fromStream(messages
53       .values()
54       .stream()
55       .filter(message ->
56       {
57         long serial = message.getSerialNumber();
58         return serial >= first && serial <= last;
59       }));
60   }
61 }