NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.producer.Producer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Mono;
10
11 import java.time.LocalDateTime;
12 import java.time.ZoneOffset;
13 import java.util.UUID;
14
15
16 /**
17  * Derzeit eigentlich einzige aktive Strategie!
18  * Rückbau?!?!
19  */
20 @RequiredArgsConstructor
21 @Slf4j
22 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
23 {
24   private final KafkaChatRoomService kafkaChatRoomService;
25   private final Producer<String, MessageTo> producer;
26   private final TopicPartition tp;
27   private final UUID chatRoomId;
28   private final ZoneOffset zoneOffset;
29
30
31   @Override
32   public Mono<Message> handleMessage(
33       Message.MessageKey key,
34       LocalDateTime timestamp,
35       String text)
36   {
37     return Mono.create(sink ->
38     {
39       ProducerRecord<String, MessageTo> record =
40           new ProducerRecord<>(
41               tp.topic(),
42               tp.partition(),
43               timestamp.toEpochSecond(zoneOffset),
44               chatRoomId.toString(),
45               MessageTo.of(key.getUsername(), key.getMessageId(), text));
46
47       producer.send(record, ((metadata, exception) ->
48       {
49         if (metadata != null)
50         {
51           // On successful send
52           {
53             // Emit new message
54             Message message = new Message(key, metadata.offset(), timestamp, text);
55             kafkaChatRoomService.addMessage(message);
56           }
57
58           sink.success();
59         }
60         else
61         {
62           // On send-failure
63           sink.error(exception);
64         }
65       }));
66     });
67   }
68 }