1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
16 @RequiredArgsConstructor
18 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
20 private final KafkaChatRoomService kafkaChatRoomService;
21 private final Producer<String, MessageTo> producer;
22 private final TopicPartition tp;
23 private final UUID chatRoomId;
24 private final ZoneOffset zoneOffset;
25 private final KafkaChatRoomService chatRoomService;
29 public Mono<Message> handleMessage(
30 Message.MessageKey key,
31 LocalDateTime timestamp,
34 return Mono.create(sink ->
36 ProducerRecord<String, MessageTo> record =
40 timestamp.toEpochSecond(zoneOffset),
41 chatRoomId.toString(),
42 MessageTo.of(key.getUsername(), key.getMessageId(), text));
44 producer.send(record, ((metadata, exception) ->
51 Message message = new Message(key, metadata.offset(), timestamp, text);
52 kafkaChatRoomService.addMessage(message);
60 sink.error(exception);