1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.log4j.Log4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.UUID;
17 @RequiredArgsConstructor
19 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
21 private final Producer<String, MessageTo> producer;
22 private final TopicPartition tp;
23 private final UUID chatRoomId;
24 private final ZoneOffset zoneOffset;
25 private final KafkaChatRoomService chatRoomService;
29 public Mono<Message> persistMessage(
30 Message.MessageKey key,
31 LocalDateTime timestamp,
34 return Mono.create(sink ->
36 ProducerRecord<String, MessageTo> record =
40 timestamp.toEpochSecond(zoneOffset),
41 chatRoomId.toString(),
42 MessageTo.of(key.getUsername(), key.getMessageId(), text));
44 producer.send(record, ((metadata, exception) ->
51 message = new Message(key, metadata.offset(), timestamp, text);
52 messages.put(message.getKey(), message);
60 sink.error(exception);
67 public MessageHandlingStrategy handleMessage(Message message)