WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.util.LinkedHashMap;
14
15
16 @Slf4j
17 public class KafkaChatRoomService implements ChatRoomService
18 {
19   private final Producer<String, MessageTo> producer;
20   private final TopicPartition tp;
21
22   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
23
24   private volatile MessageHandlingStrategy strategy;
25
26
27   public KafkaChatRoomService(
28       Producer<String, MessageTo> producer,
29       TopicPartition tp)
30   {
31     this.producer = producer;
32     this.tp = tp;
33     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
34   }
35
36
37   @Override
38   synchronized public Mono<Message> persistMessage(
39     Message.MessageKey key,
40     LocalDateTime timestamp,
41     String text)
42   {
43     return strategy.handleMessage(key, timestamp, text);
44   }
45
46   /**
47    * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
48    * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
49    */
50   protected void addMessage(Message message) throws MessageMutationException
51   {
52     Message existingMessage = messages.get(message.getKey());
53
54     // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
55     // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
56     // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
57     // fängt dies bereits der ChatRoom ab.
58     // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
59     // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
60     // doppelt aufschlägt...
61     if (existingMessage == null)
62     {
63       messages.put(message.getKey(), message);
64     }
65     else
66     {
67       if (!existingMessage.getMessageText().equals(message.getMessageText()))
68       {
69         throw new MessageMutationException(existingMessage, message.getMessageText());
70       }
71
72       // Warn and emit existing message
73       log.warn(
74           "Keeping existing message with {}@{} for {}",
75           existingMessage.getSerialNumber(),
76           existingMessage.getTimestamp(),
77           existingMessage.getKey());
78     }
79   }
80
81   @Override
82   synchronized public Mono<Message> getMessage(Message.MessageKey key)
83   {
84     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
85     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
86     return Mono.fromSupplier(() -> messages.get(key));
87   }
88
89   @Override
90   synchronized public Flux<Message> getMessages(long first, long last)
91   {
92     return Flux.fromStream(messages
93       .values()
94       .stream()
95       .filter(message ->
96       {
97         long serial = message.getSerialNumber();
98         return serial >= first && serial <= last;
99       }));
100   }
101 }