91b50312cf80a86f28ee35c87db10db2174f8f1d
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.LinkedHashMap;
15 import java.util.UUID;
16
17
18 @Slf4j
19 public class KafkaChatRoomService implements ChatRoomService
20 {
21   private final Producer<String, MessageTo> producer;
22   private final TopicPartition tp;
23   private final UUID chatRoomId;
24   private final ZoneOffset zoneOffset;
25
26   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
27
28   private volatile MessageHandlingStrategy strategy;
29
30
31   public KafkaChatRoomService(
32       Producer<String, MessageTo> producer,
33       TopicPartition tp,
34       UUID chatRoomId,
35       ZoneOffset zoneOffset)
36   {
37     this.producer = producer;
38     this.tp = tp;
39     this.chatRoomId = chatRoomId;
40     this.zoneOffset = zoneOffset;
41     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
42   }
43
44
45   @Override
46   synchronized public Mono<Message> persistMessage(
47     Message.MessageKey key,
48     LocalDateTime timestamp,
49     String text)
50   {
51     return strategy.persistMessage(key, timestamp, text);
52   }
53
54   protected void addMessage(Message message) throws MessageMutationException
55   {
56     Message existingMessage = messages.get(message.getKey());
57
58     // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
59     // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
60     // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
61     // fängt dies bereits der ChatRoom ab.
62     // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
63     // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
64     // doppelt aufschlägt...
65     if (existingMessage == null)
66     {
67       messages.put(message.getKey(), message);
68     }
69     else
70     {
71       if (!existingMessage.getMessageText().equals(message.getMessageText()))
72       {
73         throw new MessageMutationException(existingMessage, message.getMessageText());
74       }
75
76       // Warn and emit existing message
77       log.warn(
78           "Keeping existing message with {}@{} for {}",
79           existingMessage.getSerialNumber(),
80           existingMessage.getTimestamp(),
81           existingMessage.getKey());
82     }
83   }
84
85   @Override
86   synchronized public Mono<Message> getMessage(Message.MessageKey key)
87   {
88     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
89     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
90     return Mono.fromSupplier(() -> messages.get(key));
91   }
92
93   @Override
94   synchronized public Flux<Message> getMessages(long first, long last)
95   {
96     return Flux.fromStream(messages
97       .values()
98       .stream()
99       .filter(message ->
100       {
101         long serial = message.getSerialNumber();
102         return serial >= first && serial <= last;
103       }));
104   }
105 }