16ed3a7039a2cb176dfbcfe9068b29c518e9c34e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import reactor.core.publisher.Flux;
7 import reactor.core.publisher.Mono;
8
9 import java.time.LocalDateTime;
10 import java.util.LinkedHashMap;
11 import java.util.UUID;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class KafkaChatRoomService implements ChatRoomService
17 {
18   private final KafkaChatHomeService kafkaChatHomeService;
19   private final UUID chatRoomId;
20
21   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
22
23
24   @Override
25   public Mono<Message> persistMessage(
26     Message.MessageKey key,
27     LocalDateTime timestamp,
28     String text)
29   {
30     return kafkaChatHomeService
31         .sendMessage(chatRoomId, key, timestamp, text)
32         .doOnSuccess(message -> persistMessage(message));
33   }
34
35   public void persistMessage(Message message)
36   {
37     messages.put(message.getKey(), message)
38   }
39
40   @Override
41   synchronized public Mono<Message> getMessage(Message.MessageKey key)
42   {
43     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
44     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
45     return Mono.fromSupplier(() -> messages.get(key));
46   }
47
48   @Override
49   synchronized public Flux<Message> getMessages(long first, long last)
50   {
51     return Flux.fromStream(messages
52       .values()
53       .stream()
54       .filter(message ->
55       {
56         long serial = message.getSerialNumber();
57         return serial >= first && serial <= last;
58       }));
59   }
60 }