1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.LocalDateTime;
14 import java.util.LinkedHashMap;
18 public class KafkaChatRoomService implements ChatRoomService
20 private final Producer<String, MessageTo> producer;
21 private final TopicPartition tp;
23 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
26 public KafkaChatRoomService(
27 Producer<String, MessageTo> producer,
30 this.producer = producer;
36 synchronized public Mono<Message> persistMessage(
37 Message.MessageKey key,
38 LocalDateTime timestamp,
41 return Mono.create(sink ->
43 ProducerRecord<String, MessageTo> record =
47 timestamp.toEpochSecond(zoneOffset),
48 chatRoomId.toString(),
49 MessageTo.of(key.getUsername(), key.getMessageId(), text));
51 producer.send(record, ((metadata, exception) ->
58 Message message = new Message(key, metadata.offset(), timestamp, text);
59 kafkaChatRoomService.addMessage(message);
67 sink.error(exception);
74 * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
75 * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
77 protected void addMessage(Message message) throws MessageMutationException
79 Message existingMessage = messages.get(message.getKey());
81 // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
82 // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
83 // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
84 // fängt dies bereits der ChatRoom ab.
85 // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
86 // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
87 // doppelt aufschlägt...
88 if (existingMessage == null)
90 messages.put(message.getKey(), message);
94 if (!existingMessage.getMessageText().equals(message.getMessageText()))
96 throw new MessageMutationException(existingMessage, message.getMessageText());
99 // Warn and emit existing message
101 "Keeping existing message with {}@{} for {}",
102 existingMessage.getSerialNumber(),
103 existingMessage.getTimestamp(),
104 existingMessage.getKey());
109 synchronized public Mono<Message> getMessage(Message.MessageKey key)
111 // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
112 // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
113 return Mono.fromSupplier(() -> messages.get(key));
117 synchronized public Flux<Message> getMessages(long first, long last)
119 return Flux.fromStream(messages
124 long serial = message.getSerialNumber();
125 return serial >= first && serial <= last;