f036efe135071da00e057e4ce73c3f7d68652b8c
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.LocalDateTime;
14 import java.util.LinkedHashMap;
15
16
17 @Slf4j
18 public class KafkaChatRoomService implements ChatRoomService
19 {
20   private final Producer<String, MessageTo> producer;
21   private final TopicPartition tp;
22
23   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
24
25
26   public KafkaChatRoomService(
27       Producer<String, MessageTo> producer,
28       TopicPartition tp)
29   {
30     this.producer = producer;
31     this.tp = tp;
32   }
33
34
35   @Override
36   synchronized public Mono<Message> persistMessage(
37     Message.MessageKey key,
38     LocalDateTime timestamp,
39     String text)
40   {
41     return Mono.create(sink ->
42     {
43       ProducerRecord<String, MessageTo> record =
44           new ProducerRecord<>(
45               tp.topic(),
46               tp.partition(),
47               timestamp.toEpochSecond(zoneOffset),
48               chatRoomId.toString(),
49               MessageTo.of(key.getUsername(), key.getMessageId(), text));
50
51       producer.send(record, ((metadata, exception) ->
52       {
53         if (metadata != null)
54         {
55           // On successful send
56           {
57             // Emit new message
58             Message message = new Message(key, metadata.offset(), timestamp, text);
59             kafkaChatRoomService.addMessage(message);
60           }
61
62           sink.success();
63         }
64         else
65         {
66           // On send-failure
67           sink.error(exception);
68         }
69       }));
70     });
71   }
72
73   /**
74    * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
75    * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
76    */
77   protected void addMessage(Message message) throws MessageMutationException
78   {
79     Message existingMessage = messages.get(message.getKey());
80
81     // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
82     // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
83     // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
84     // fängt dies bereits der ChatRoom ab.
85     // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
86     // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
87     // doppelt aufschlägt...
88     if (existingMessage == null)
89     {
90       messages.put(message.getKey(), message);
91     }
92     else
93     {
94       if (!existingMessage.getMessageText().equals(message.getMessageText()))
95       {
96         throw new MessageMutationException(existingMessage, message.getMessageText());
97       }
98
99       // Warn and emit existing message
100       log.warn(
101           "Keeping existing message with {}@{} for {}",
102           existingMessage.getSerialNumber(),
103           existingMessage.getTimestamp(),
104           existingMessage.getKey());
105     }
106   }
107
108   @Override
109   synchronized public Mono<Message> getMessage(Message.MessageKey key)
110   {
111     // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
112     // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
113     return Mono.fromSupplier(() -> messages.get(key));
114   }
115
116   @Override
117   synchronized public Flux<Message> getMessages(long first, long last)
118   {
119     return Flux.fromStream(messages
120       .values()
121       .stream()
122       .filter(message ->
123       {
124         long serial = message.getSerialNumber();
125         return serial >= first && serial <= last;
126       }));
127   }
128 }