1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.log4j.Log4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Mono;
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.UUID;
17 @RequiredArgsConstructor
19 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
21 private final KafkaChatRoomService kafkaChatRoomService;
22 private final Producer<String, MessageTo> producer;
23 private final TopicPartition tp;
24 private final UUID chatRoomId;
25 private final ZoneOffset zoneOffset;
26 private final KafkaChatRoomService chatRoomService;
30 public Mono<Message> handleMessage(
31 Message.MessageKey key,
32 LocalDateTime timestamp,
35 return Mono.create(sink ->
37 ProducerRecord<String, MessageTo> record =
41 timestamp.toEpochSecond(zoneOffset),
42 chatRoomId.toString(),
43 MessageTo.of(key.getUsername(), key.getMessageId(), text));
45 producer.send(record, ((metadata, exception) ->
52 Message message = new Message(key, metadata.offset(), timestamp, text);
53 kafkaChatRoomService.addMessage(message);
61 sink.error(exception);