1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.LinkedHashMap;
15 import java.util.UUID;
19 public class KafkaChatRoomService implements ChatRoomService
21 private final Producer<String, MessageTo> producer;
22 private final TopicPartition tp;
23 private final UUID chatRoomId;
24 private final ZoneOffset zoneOffset;
26 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28 private volatile MessageHandlingStrategy strategy;
31 public KafkaChatRoomService(
32 Producer<String, MessageTo> producer,
35 ZoneOffset zoneOffset)
37 this.producer = producer;
39 this.chatRoomId = chatRoomId;
40 this.zoneOffset = zoneOffset;
41 this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
46 synchronized public Mono<Message> persistMessage(
47 Message.MessageKey key,
48 LocalDateTime timestamp,
51 return strategy.persistMessage(key, timestamp, text);
54 protected void addMessage(Message message) throws MessageMutationException
56 Message existingMessage = messages.get(message.getKey());
58 // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
59 // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
60 // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
61 // fängt dies bereits der ChatRoom ab.
62 // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
63 // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
64 // doppelt aufschlägt...
65 if (existingMessage == null)
67 messages.put(message.getKey(), message);
71 if (!existingMessage.getMessageText().equals(message.getMessageText()))
73 throw new MessageMutationException(existingMessage, message.getMessageText());
76 // Warn and emit existing message
78 "Keeping existing message with {}@{} for {}",
79 existingMessage.getSerialNumber(),
80 existingMessage.getTimestamp(),
81 existingMessage.getKey());
86 synchronized public Mono<Message> getMessage(Message.MessageKey key)
88 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
89 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
90 return Mono.fromSupplier(() -> messages.get(key));
94 synchronized public Flux<Message> getMessages(long first, long last)
96 return Flux.fromStream(messages
101 long serial = message.getSerialNumber();
102 return serial >= first && serial <= last;