04564b9c62813dc5f092486f90100506f531a44c
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
14
15
16 @RequiredArgsConstructor
17 @Slf4j
18 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
19 {
20   private final KafkaChatRoomService kafkaChatRoomService;
21   private final Producer<String, MessageTo> producer;
22   private final TopicPartition tp;
23   private final UUID chatRoomId;
24   private final ZoneOffset zoneOffset;
25   private final KafkaChatRoomService chatRoomService;
26
27
28   @Override
29   public Mono<Message> handleMessage(
30       Message.MessageKey key,
31       LocalDateTime timestamp,
32       String text)
33   {
34     return Mono.create(sink ->
35     {
36       ProducerRecord<String, MessageTo> record =
37           new ProducerRecord<>(
38               tp.topic(),
39               tp.partition(),
40               timestamp.toEpochSecond(zoneOffset),
41               chatRoomId.toString(),
42               MessageTo.of(key.getUsername(), key.getMessageId(), text));
43
44       producer.send(record, ((metadata, exception) ->
45       {
46         if (metadata != null)
47         {
48           // On successful send
49           {
50             // Emit new message
51             Message message = new Message(key, metadata.offset(), timestamp, text);
52             kafkaChatRoomService.addMessage(message);
53           }
54
55           sink.success();
56         }
57         else
58         {
59           // On send-failure
60           sink.error(exception);
61         }
62       }));
63     });
64   }
65 }