1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.LocalDateTime;
14 import java.time.ZoneOffset;
15 import java.util.LinkedHashMap;
16 import java.util.UUID;
20 public class KafkaChatRoomService implements ChatRoomService
22 private final Producer<String, MessageTo> producer;
23 private final TopicPartition tp;
24 private final UUID chatRoomId;
25 private final ZoneOffset zoneOffset;
27 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
29 private MessageHandlingStrategy strategy;
32 public KafkaChatRoomService(
33 Producer<String, MessageTo> producer,
36 ZoneOffset zoneOffset)
38 this.producer = producer;
40 this.chatRoomId = chatRoomId;
41 this.zoneOffset = zoneOffset;
42 this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
47 public Mono<Message> persistMessage(
48 Message.MessageKey key,
49 LocalDateTime timestamp,
52 return strategy.persistMessage(key, timestamp, text);
55 synchronized protected void addMessage(Message message) throws MessageMutationException
57 Message existingMessage = messages.get(message.getKey());
59 if (existingMessage == null)
61 messages.put(existingMessage.getKey(), existingMessage);
65 if (!existingMessage.getMessageText().equals(message.getMessageText()))
67 throw new MessageMutationException(existingMessage, message.getMessageText());
70 // Warn and emit existing message
72 "Keeping existing message with {}@{} for {}",
73 existingMessage.getSerialNumber(),
74 existingMessage.getTimestamp(),
75 existingMessage.getKey());
80 synchronized public Mono<Message> getMessage(Message.MessageKey key)
82 return Mono.fromSupplier(() -> messages.get(key));
86 synchronized public Flux<Message> getMessages(long first, long last)
88 return Flux.fromStream(messages
93 long serial = message.getSerialNumber();
94 return serial >= first && serial <= last;