1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.clients.producer.RecordMetadata;
11 import org.apache.kafka.common.TopicPartition;
12 import reactor.core.publisher.Flux;
13 import reactor.core.publisher.Mono;
15 import java.time.LocalDateTime;
16 import java.time.ZoneOffset;
17 import java.util.LinkedHashMap;
18 import java.util.UUID;
19 import java.util.concurrent.Future;
23 @RequiredArgsConstructor
24 public class KafkaChatRoomService implements ChatRoomService
26 private final Producer<String, MessageTo> producer;
27 private final TopicPartition tp;
28 private final UUID chatRoomId;
29 private final ZoneOffset zoneOffset;
31 private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
33 private long offset = 0l;
37 public Mono<Message> persistMessage(
38 Message.MessageKey key,
39 LocalDateTime timestamp,
42 return Mono.create(sink ->
44 ProducerRecord<String, MessageTo> record =
48 timestamp.toEpochSecond(zoneOffset),
49 chatRoomId.toString(),
50 MessageTo.of(key.getUsername(), key.getMessageId(), text));
52 producer.send(record, ((metadata, exception) ->
56 Message message = messages.get(key);
59 if (message.getMessageText().equals(text))
61 // Warn and emit existing message
63 "Keeping existing message with {}@{} for {}",
64 message.getSerialNumber(),
65 message.getTimestamp(), key);
69 // Emit error and abort
70 sink.error(new MessageMutationException(message, text));
77 message = new Message(key, metadata.offset(), timestamp, text);
78 messages.put(message.getKey(), message);
88 public Mono<Message> getMessage(Message.MessageKey key)
90 return Mono.fromSupplier(() -> messages.get(key));
94 public Flux<Message> getMessages(long first, long last)
96 return Flux.fromStream(messages
101 long serial = message.getSerialNumber();
102 return serial >= first && serial <= last;