cfd0e4ecce5df39bed1a9780cfbdbbd152b6714d
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.log4j.Log4j;
7 import org.apache.kafka.clients.producer.Producer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Mono;
11
12 import java.time.LocalDateTime;
13 import java.time.ZoneOffset;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Log4j
19 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
20 {
21   private final KafkaChatRoomService kafkaChatRoomService;
22   private final Producer<String, MessageTo> producer;
23   private final TopicPartition tp;
24   private final UUID chatRoomId;
25   private final ZoneOffset zoneOffset;
26   private final KafkaChatRoomService chatRoomService;
27
28
29   @Override
30   public Mono<Message> handleMessage(
31       Message.MessageKey key,
32       LocalDateTime timestamp,
33       String text)
34   {
35     return Mono.create(sink ->
36     {
37       ProducerRecord<String, MessageTo> record =
38           new ProducerRecord<>(
39               tp.topic(),
40               tp.partition(),
41               timestamp.toEpochSecond(zoneOffset),
42               chatRoomId.toString(),
43               MessageTo.of(key.getUsername(), key.getMessageId(), text));
44
45       producer.send(record, ((metadata, exception) ->
46       {
47         if (metadata != null)
48         {
49           // On successful send
50           {
51             // Emit new message
52             Message message = new Message(key, metadata.offset(), timestamp, text);
53             kafkaChatRoomService.addMessage(message);
54           }
55
56           sink.success();
57         }
58         else
59         {
60           // On send-failure
61           sink.error(exception);
62         }
63       }));
64     });
65   }
66 }