1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
14 import java.util.UUID;
17 @RequiredArgsConstructor
19 public class KafkaChatRoomService implements ChatRoomService
21 private final KafkaChatHomeService kafkaChatHomeService;
22 private final UUID chatRoomId;
24 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28 public Mono<Message> persistMessage(
29 Message.MessageKey key,
30 LocalDateTime timestamp,
33 return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
37 * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
38 * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
40 protected void addMessage(Message message) throws MessageMutationException
42 Message existingMessage = messages.get(message.getKey());
44 // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
45 // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
46 // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
47 // fängt dies bereits der ChatRoom ab.
48 // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
49 // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
50 // doppelt aufschlägt...
51 if (existingMessage == null)
53 messages.put(message.getKey(), message);
57 if (!existingMessage.getMessageText().equals(message.getMessageText()))
59 throw new MessageMutationException(existingMessage, message.getMessageText());
62 // Warn and emit existing message
64 "Keeping existing message with {}@{} for {}",
65 existingMessage.getSerialNumber(),
66 existingMessage.getTimestamp(),
67 existingMessage.getKey());
72 synchronized public Mono<Message> getMessage(Message.MessageKey key)
74 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
75 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
76 return Mono.fromSupplier(() -> messages.get(key));
80 synchronized public Flux<Message> getMessages(long first, long last)
82 return Flux.fromStream(messages
87 long serial = message.getSerialNumber();
88 return serial >= first && serial <= last;