c4737a14f6557ad76d628b036427856a7f7363e5
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.LinkedHashMap;
15 import java.util.UUID;
16
17
18 @Slf4j
19 public class KafkaChatRoomService implements ChatRoomService
20 {
21   private final Producer<String, MessageTo> producer;
22   private final TopicPartition tp;
23   private final UUID chatRoomId;
24   private final ZoneOffset zoneOffset;
25
26   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
27
28   private volatile MessageHandlingStrategy strategy;
29
30
31   public KafkaChatRoomService(
32       Producer<String, MessageTo> producer,
33       TopicPartition tp,
34       UUID chatRoomId,
35       ZoneOffset zoneOffset)
36   {
37     this.producer = producer;
38     this.tp = tp;
39     this.chatRoomId = chatRoomId;
40     this.zoneOffset = zoneOffset;
41     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
42   }
43
44
45   @Override
46   synchronized public Mono<Message> persistMessage(
47     Message.MessageKey key,
48     LocalDateTime timestamp,
49     String text)
50   {
51     return strategy.handleMessage(key, timestamp, text);
52   }
53
54   /**
55    * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
56    * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
57    */
58   protected void addMessage(Message message) throws MessageMutationException
59   {
60     Message existingMessage = messages.get(message.getKey());
61
62     // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
63     // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
64     // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
65     // fängt dies bereits der ChatRoom ab.
66     // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
67     // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
68     // doppelt aufschlägt...
69     if (existingMessage == null)
70     {
71       messages.put(message.getKey(), message);
72     }
73     else
74     {
75       if (!existingMessage.getMessageText().equals(message.getMessageText()))
76       {
77         throw new MessageMutationException(existingMessage, message.getMessageText());
78       }
79
80       // Warn and emit existing message
81       log.warn(
82           "Keeping existing message with {}@{} for {}",
83           existingMessage.getSerialNumber(),
84           existingMessage.getTimestamp(),
85           existingMessage.getKey());
86     }
87   }
88
89   @Override
90   synchronized public Mono<Message> getMessage(Message.MessageKey key)
91   {
92     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
93     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
94     return Mono.fromSupplier(() -> messages.get(key));
95   }
96
97   @Override
98   synchronized public Flux<Message> getMessages(long first, long last)
99   {
100     return Flux.fromStream(messages
101       .values()
102       .stream()
103       .filter(message ->
104       {
105         long serial = message.getSerialNumber();
106         return serial >= first && serial <= last;
107       }));
108   }
109 }