package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
import java.util.LinkedHashMap;
import java.util.UUID;
-@Slf4j
@RequiredArgsConstructor
+@Slf4j
public class KafkaChatRoomService implements ChatRoomService
{
- private final Producer<String, MessageTo> producer;
- private final TopicPartition tp;
+ private final ChatMessageChannel chatMessageChannel;
private final UUID chatRoomId;
- private final ZoneOffset zoneOffset;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
- private long offset = 0l;
-
@Override
public Mono<Message> persistMessage(
LocalDateTime timestamp,
String text)
{
- return Mono.create(sink ->
- {
- ProducerRecord<String, MessageTo> record =
- new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
- timestamp.toEpochSecond(zoneOffset),
- chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
-
- producer.send(record, ((metadata, exception) ->
- {
- if (metadata != null)
- {
- // On successful send
- Message message = messages.get(key);
- if (message != null)
- {
- if (message.getMessageText().equals(text))
- {
- // Warn and emit existing message
- log.warn(
- "Keeping existing message with {}@{} for {}",
- message.getSerialNumber(),
- message.getTimestamp(), key);
- }
- else
- {
- // Emit error and abort
- sink.error(new MessageMutationException(message, text));
- return;
- }
- }
- else
- {
- // Emit new message
- message = new Message(key, metadata.offset(), timestamp, text);
- messages.put(message.getKey(), message);
- }
+ return chatMessageChannel
+ .sendChatMessage(chatRoomId, key, timestamp, text)
+ .doOnSuccess(message -> persistMessage(message));
+ }
- sink.success();
- }
- else
- {
- // On send-failure
- sink.error(exception);
- }
- }));
- });
+ void persistMessage(Message message)
+ {
+ messages.put(message.getKey(), message);
}
@Override
- public Mono<Message> getMessage(Message.MessageKey key)
+ synchronized public Mono<Message> getMessage(Message.MessageKey key)
{
return Mono.fromSupplier(() -> messages.get(key));
}
@Override
- public Flux<Message> getMessages(long first, long last)
+ synchronized public Flux<Message> getMessages(long first, long last)
{
return Flux.fromStream(messages
.values()