69b6fe9ba2cd92b111a133fdd62c30e8b774a802
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.log4j.Log4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Log4j
19 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
20 {
21   private final Producer<String, MessageTo> producer;
22   private final TopicPartition tp;
23   private final UUID chatRoomId;
24   private final ZoneOffset zoneOffset;
25   private final KafkaChatRoomService chatRoomService;
26
27
28   @Override
29   public Mono<Message> persistMessage(
30       Message.MessageKey key,
31       LocalDateTime timestamp,
32       String text)
33   {
34     return Mono.create(sink ->
35     {
36       ProducerRecord<String, MessageTo> record =
37           new ProducerRecord<>(
38               tp.topic(),
39               tp.partition(),
40               timestamp.toEpochSecond(zoneOffset),
41               chatRoomId.toString(),
42               MessageTo.of(key.getUsername(), key.getMessageId(), text));
43
44       producer.send(record, ((metadata, exception) ->
45       {
46         if (metadata != null)
47         {
48           // On successful send
49           {
50             // Emit new message
51             message = new Message(key, metadata.offset(), timestamp, text);
52             messages.put(message.getKey(), message);
53           }
54
55           sink.success();
56         }
57         else
58         {
59           // On send-failure
60           sink.error(exception);
61         }
62       }));
63     });
64   }
65
66   @Override
67   public MessageHandlingStrategy handleMessage(Message message)
68   {
69   }
70 }