1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.LinkedHashMap;
15 import java.util.UUID;
19 public class KafkaChatRoomService implements ChatRoomService
21 private final Producer<String, MessageTo> producer;
22 private final TopicPartition tp;
23 private final UUID chatRoomId;
24 private final ZoneOffset zoneOffset;
26 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28 private volatile MessageHandlingStrategy strategy;
31 public KafkaChatRoomService(
32 Producer<String, MessageTo> producer,
35 ZoneOffset zoneOffset)
37 this.producer = producer;
39 this.chatRoomId = chatRoomId;
40 this.zoneOffset = zoneOffset;
41 this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
46 synchronized public Mono<Message> persistMessage(
47 Message.MessageKey key,
48 LocalDateTime timestamp,
51 return strategy.handleMessage(key, timestamp, text);
55 * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
56 * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
58 protected void addMessage(Message message) throws MessageMutationException
60 Message existingMessage = messages.get(message.getKey());
62 // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
63 // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
64 // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
65 // fängt dies bereits der ChatRoom ab.
66 // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
67 // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
68 // doppelt aufschlägt...
69 if (existingMessage == null)
71 messages.put(message.getKey(), message);
75 if (!existingMessage.getMessageText().equals(message.getMessageText()))
77 throw new MessageMutationException(existingMessage, message.getMessageText());
80 // Warn and emit existing message
82 "Keeping existing message with {}@{} for {}",
83 existingMessage.getSerialNumber(),
84 existingMessage.getTimestamp(),
85 existingMessage.getKey());
90 synchronized public Mono<Message> getMessage(Message.MessageKey key)
92 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
93 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
94 return Mono.fromSupplier(() -> messages.get(key));
98 synchronized public Flux<Message> getMessages(long first, long last)
100 return Flux.fromStream(messages
105 long serial = message.getSerialNumber();
106 return serial >= first && serial <= last;