NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 public class KafkaChatRoomService implements ChatRoomService
20 {
21   private final KafkaChatHomeService kafkaChatHomeService;
22   private final UUID chatRoomId;
23
24   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
25
26
27   @Override
28   public Mono<Message> persistMessage(
29     Message.MessageKey key,
30     LocalDateTime timestamp,
31     String text)
32   {
33     return kafkaChatHomeService
34         .sendMessage(chatRoomId, key, timestamp, text)
35         .doOnSuccess(message -> persistMessage(message));
36   }
37
38   public void persistMessage(Message message)
39   {
40     messages.put(message.getKey(), message)
41   }
42
43   @Override
44   synchronized public Mono<Message> getMessage(Message.MessageKey key)
45   {
46     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
47     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
48     return Mono.fromSupplier(() -> messages.get(key));
49   }
50
51   @Override
52   synchronized public Flux<Message> getMessages(long first, long last)
53   {
54     return Flux.fromStream(messages
55       .values()
56       .stream()
57       .filter(message ->
58       {
59         long serial = message.getSerialNumber();
60         return serial >= first && serial <= last;
61       }));
62   }
63 }