WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.LocalDateTime;
14 import java.time.ZoneOffset;
15 import java.util.LinkedHashMap;
16 import java.util.UUID;
17
18
19 @Slf4j
20 public class KafkaChatRoomService implements ChatRoomService
21 {
22   private final Producer<String, MessageTo> producer;
23   private final TopicPartition tp;
24   private final UUID chatRoomId;
25   private final ZoneOffset zoneOffset;
26
27   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
28
29   private MessageHandlingStrategy strategy;
30
31
32   public KafkaChatRoomService(
33       Producer<String, MessageTo> producer,
34       TopicPartition tp,
35       UUID chatRoomId,
36       ZoneOffset zoneOffset)
37   {
38     this.producer = producer;
39     this.tp = tp;
40     this.chatRoomId = chatRoomId;
41     this.zoneOffset = zoneOffset;
42     this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
43   }
44
45
46   @Override
47   public Mono<Message> persistMessage(
48     Message.MessageKey key,
49     LocalDateTime timestamp,
50     String text)
51   {
52     return strategy.persistMessage(key, timestamp, text);
53   }
54
55   synchronized protected void addMessage(Message message) throws MessageMutationException
56   {
57     Message existingMessage = messages.get(message.getKey());
58
59     if (existingMessage == null)
60     {
61       messages.put(existingMessage.getKey(), existingMessage);
62     }
63     else
64     {
65       if (!existingMessage.getMessageText().equals(message.getMessageText()))
66       {
67         throw new MessageMutationException(existingMessage, message.getMessageText());
68       }
69
70       // Warn and emit existing message
71       log.warn(
72           "Keeping existing message with {}@{} for {}",
73           existingMessage.getSerialNumber(),
74           existingMessage.getTimestamp(),
75           existingMessage.getKey());
76     }
77   }
78
79   @Override
80   synchronized public Mono<Message> getMessage(Message.MessageKey key)
81   {
82     return Mono.fromSupplier(() -> messages.get(key));
83   }
84
85   @Override
86   synchronized public Flux<Message> getMessages(long first, long last)
87   {
88     return Flux.fromStream(messages
89       .values()
90       .stream()
91       .filter(message ->
92       {
93         long serial = message.getSerialNumber();
94         return serial >= first && serial <= last;
95       }));
96   }
97 }