1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
17 * Derzeit eigentlich einzige aktive Strategie!
20 @RequiredArgsConstructor
22 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
24 private final KafkaChatRoomService kafkaChatRoomService;
25 private final Producer<String, MessageTo> producer;
26 private final TopicPartition tp;
27 private final UUID chatRoomId;
28 private final ZoneOffset zoneOffset;
32 public Mono<Message> handleMessage(
33 Message.MessageKey key,
34 LocalDateTime timestamp,
37 return Mono.create(sink ->
39 ProducerRecord<String, MessageTo> record =
43 timestamp.toEpochSecond(zoneOffset),
44 chatRoomId.toString(),
45 MessageTo.of(key.getUsername(), key.getMessageId(), text));
47 producer.send(record, ((metadata, exception) ->
54 Message message = new Message(key, metadata.offset(), timestamp, text);
55 kafkaChatRoomService.addMessage(message);
63 sink.error(exception);