WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.TopicPartition;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13
14 import java.time.LocalDateTime;
15 import java.time.ZoneOffset;
16 import java.util.LinkedHashMap;
17 import java.util.UUID;
18
19
20 @Slf4j
21 @RequiredArgsConstructor
22 public class KafkaChatRoomService implements ChatRoomService
23 {
24   private final Producer<String, MessageTo> producer;
25   private final TopicPartition tp;
26   private final UUID chatRoomId;
27   private final ZoneOffset zoneOffset;
28
29   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
30
31   private long offset = 0l;
32
33
34   @Override
35   public Mono<Message> persistMessage(
36     Message.MessageKey key,
37     LocalDateTime timestamp,
38     String text)
39   {
40     return Mono.create(sink ->
41     {
42       ProducerRecord<String, MessageTo> record =
43           new ProducerRecord<>(
44               tp.topic(),
45               tp.partition(),
46               timestamp.toEpochSecond(zoneOffset),
47               chatRoomId.toString(),
48               MessageTo.of(key.getUsername(), key.getMessageId(), text));
49
50       producer.send(record, ((metadata, exception) ->
51       {
52         if (metadata != null)
53         {
54           // On successful send
55           Message message = messages.get(key);
56           if (message != null)
57           {
58             if (message.getMessageText().equals(text))
59             {
60               // Warn and emit existing message
61               log.warn(
62                   "Keeping existing message with {}@{} for {}",
63                   message.getSerialNumber(),
64                   message.getTimestamp(), key);
65             }
66             else
67             {
68               // Emit error and abort
69               sink.error(new MessageMutationException(message, text));
70               return;
71             }
72           }
73           else
74           {
75             // Emit new message
76             message = new Message(key, metadata.offset(), timestamp, text);
77             messages.put(message.getKey(), message);
78           }
79
80           sink.success();
81         }
82         else
83         {
84           // On send-failure
85           sink.error(exception);
86         }
87       }));
88     });
89   }
90
91   @Override
92   public Mono<Message> getMessage(Message.MessageKey key)
93   {
94     return Mono.fromSupplier(() -> messages.get(key));
95   }
96
97   @Override
98   public Flux<Message> getMessages(long first, long last)
99   {
100     return Flux.fromStream(messages
101       .values()
102       .stream()
103       .filter(message ->
104       {
105         long serial = message.getSerialNumber();
106         return serial >= first && serial <= last;
107       }));
108   }
109 }