1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
18 * Actual the only active strategy!
19 * {@link MessageHandlingStrategy} probably not needed: Refactor!
21 @RequiredArgsConstructor
23 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
25 private final KafkaChatRoomService kafkaChatRoomService;
26 private final Producer<String, MessageTo> producer;
27 private final TopicPartition tp;
28 private final UUID chatRoomId;
29 private final ZoneOffset zoneOffset;
33 public Mono<Message> handleMessage(
34 Message.MessageKey key,
35 LocalDateTime timestamp,
38 return Mono.create(sink ->
40 ProducerRecord<String, MessageTo> record =
44 timestamp.toEpochSecond(zoneOffset),
45 chatRoomId.toString(),
46 MessageTo.of(key.getUsername(), key.getMessageId(), text));
48 producer.send(record, ((metadata, exception) ->
55 Message message = new Message(key, metadata.offset(), timestamp, text);
56 kafkaChatRoomService.addMessage(message);
64 sink.error(exception);