562e2df89cba52b1a8639a59270353d2a9e192e6
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
14
15
16 /**
17  * TODO:
18  * Actual the only active strategy!
19  * {@link MessageHandlingStrategy} probably not needed: Refactor!
20  */
21 @RequiredArgsConstructor
22 @Slf4j
23 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
24 {
25   private final KafkaChatRoomService kafkaChatRoomService;
26   private final Producer<String, MessageTo> producer;
27   private final TopicPartition tp;
28   private final UUID chatRoomId;
29   private final ZoneOffset zoneOffset;
30
31
32   @Override
33   public Mono<Message> handleMessage(
34       Message.MessageKey key,
35       LocalDateTime timestamp,
36       String text)
37   {
38     return Mono.create(sink ->
39     {
40       ProducerRecord<String, MessageTo> record =
41           new ProducerRecord<>(
42               tp.topic(),
43               tp.partition(),
44               timestamp.toEpochSecond(zoneOffset),
45               chatRoomId.toString(),
46               MessageTo.of(key.getUsername(), key.getMessageId(), text));
47
48       producer.send(record, ((metadata, exception) ->
49       {
50         if (metadata != null)
51         {
52           // On successful send
53           {
54             // Emit new message
55             Message message = new Message(key, metadata.offset(), timestamp, text);
56             kafkaChatRoomService.addMessage(message);
57           }
58
59           sink.success();
60         }
61         else
62         {
63           // On send-failure
64           sink.error(exception);
65         }
66       }));
67     });
68   }
69 }