WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.clients.producer.RecordMetadata;
11 import org.apache.kafka.common.TopicPartition;
12 import reactor.core.publisher.Flux;
13 import reactor.core.publisher.Mono;
14
15 import java.time.LocalDateTime;
16 import java.time.ZoneOffset;
17 import java.util.LinkedHashMap;
18 import java.util.UUID;
19 import java.util.concurrent.Future;
20
21
22 @Slf4j
23 @RequiredArgsConstructor
24 public class KafkaChatRoomService implements ChatRoomService
25 {
26   private final Producer<String, MessageTo> producer;
27   private final TopicPartition tp;
28   private final UUID chatRoomId;
29   private final ZoneOffset zoneOffset;
30
31   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
32
33   private long offset = 0l;
34
35
36   @Override
37   public Mono<Message> persistMessage(
38     Message.MessageKey key,
39     LocalDateTime timestamp,
40     String text)
41   {
42     return Mono.create(sink ->
43     {
44       ProducerRecord<String, MessageTo> record =
45           new ProducerRecord<>(
46               tp.topic(),
47               tp.partition(),
48               timestamp.toEpochSecond(zoneOffset),
49               chatRoomId.toString(),
50               MessageTo.of(key.getUsername(), key.getMessageId(), text));
51
52       producer.send(record, ((metadata, exception) ->
53       {
54         if (metadata != null)
55         {
56           Message message = messages.get(key);
57           if (message != null)
58           {
59             if (message.getMessageText().equals(text))
60             {
61               // Warn and emit existing message
62               log.warn(
63                   "Keeping existing message with {}@{} for {}",
64                   message.getSerialNumber(),
65                   message.getTimestamp(), key);
66             }
67             else
68             {
69               // Emit error and abort
70               sink.error(new MessageMutationException(message, text));
71               return;
72             }
73           }
74           else
75           {
76             // Emit new message
77             message = new Message(key, metadata.offset(), timestamp, text);
78             messages.put(message.getKey(), message);
79           }
80
81           sink.success();
82         }
83       }));
84     });
85   }
86
87   @Override
88   public Mono<Message> getMessage(Message.MessageKey key)
89   {
90     return Mono.fromSupplier(() -> messages.get(key));
91   }
92
93   @Override
94   public Flux<Message> getMessages(long first, long last)
95   {
96     return Flux.fromStream(messages
97       .values()
98       .stream()
99       .filter(message ->
100       {
101         long serial = message.getSerialNumber();
102         return serial >= first && serial <= last;
103       }));
104   }
105 }