ed155df873a7344d7fe46159bee8b4244b935e2e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 public class KafkaChatRoomService implements ChatRoomService
20 {
21   private final KafkaChatHomeService kafkaChatHomeService;
22   private final UUID chatRoomId;
23
24   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
25
26
27   @Override
28   public Mono<Message> persistMessage(
29     Message.MessageKey key,
30     LocalDateTime timestamp,
31     String text)
32   {
33     return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
34   }
35
36   /**
37    * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
38    * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
39    */
40   protected void addMessage(Message message) throws MessageMutationException
41   {
42     Message existingMessage = messages.get(message.getKey());
43
44     // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
45     // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
46     // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
47     // fängt dies bereits der ChatRoom ab.
48     // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
49     // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
50     // doppelt aufschlägt...
51     if (existingMessage == null)
52     {
53       messages.put(message.getKey(), message);
54     }
55     else
56     {
57       if (!existingMessage.getMessageText().equals(message.getMessageText()))
58       {
59         throw new MessageMutationException(existingMessage, message.getMessageText());
60       }
61
62       // Warn and emit existing message
63       log.warn(
64           "Keeping existing message with {}@{} for {}",
65           existingMessage.getSerialNumber(),
66           existingMessage.getTimestamp(),
67           existingMessage.getKey());
68     }
69   }
70
71   @Override
72   synchronized public Mono<Message> getMessage(Message.MessageKey key)
73   {
74     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
75     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
76     return Mono.fromSupplier(() -> messages.get(key));
77   }
78
79   @Override
80   synchronized public Flux<Message> getMessages(long first, long last)
81   {
82     return Flux.fromStream(messages
83       .values()
84       .stream()
85       .filter(message ->
86       {
87         long serial = message.getSerialNumber();
88         return serial >= first && serial <= last;
89       }));
90   }
91 }