1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
17 public class KafkaChatRoomService implements ChatRoomService
19 private final Producer<String, MessageTo> producer;
20 private final TopicPartition tp;
22 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
24 private volatile MessageHandlingStrategy strategy;
27 public KafkaChatRoomService(
28 Producer<String, MessageTo> producer,
31 this.producer = producer;
33 this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
38 synchronized public Mono<Message> persistMessage(
39 Message.MessageKey key,
40 LocalDateTime timestamp,
43 return strategy.handleMessage(key, timestamp, text);
47 * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
48 * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
50 protected void addMessage(Message message) throws MessageMutationException
52 Message existingMessage = messages.get(message.getKey());
54 // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
55 // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
56 // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
57 // fängt dies bereits der ChatRoom ab.
58 // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
59 // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
60 // doppelt aufschlägt...
61 if (existingMessage == null)
63 messages.put(message.getKey(), message);
67 if (!existingMessage.getMessageText().equals(message.getMessageText()))
69 throw new MessageMutationException(existingMessage, message.getMessageText());
72 // Warn and emit existing message
74 "Keeping existing message with {}@{} for {}",
75 existingMessage.getSerialNumber(),
76 existingMessage.getTimestamp(),
77 existingMessage.getKey());
82 synchronized public Mono<Message> getMessage(Message.MessageKey key)
84 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
85 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
86 return Mono.fromSupplier(() -> messages.get(key));
90 synchronized public Flux<Message> getMessages(long first, long last)
92 return Flux.fromStream(messages
97 long serial = message.getSerialNumber();
98 return serial >= first && serial <= last;