NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.RecordDeserializationException;
13 import org.apache.kafka.common.errors.WakeupException;
14 import reactor.core.publisher.Mono;
15
16 import java.time.*;
17 import java.util.List;
18 import java.util.UUID;
19
20
21 @RequiredArgsConstructor
22 @Slf4j
23 public class ChatRoomChannel implements Runnable
24 {
25   private final String topic;
26   private final Consumer<String, ChatRoomTo> consumer;
27   private final Producer<String, ChatRoomTo> producer;
28   private final ChatRoomFactory chatRoomFactory;
29
30   private boolean running;
31
32
33   @Override
34   public void run()
35   {
36     consumer.assign(List.of(new TopicPartition(topic, 0)));
37
38     running = true;
39
40     while (running)
41     {
42       try
43       {
44         ConsumerRecords<String, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
45         log.info("Fetched {} messages", records.count());
46
47         for (ConsumerRecord<String, ChatRoomTo> record : records)
48         {
49           UUID id = record.value().getId();
50           String name = record.value().getName();
51           chatRoomFactory.createChatRoom(id, name);
52         }
53       }
54       catch (WakeupException e)
55       {
56       }
57       catch (RecordDeserializationException e)
58       {
59       }
60     }
61   }
62
63   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
64       UUID chatRoomId,
65       String name)
66   {
67     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name);
68     return Mono.create(sink ->
69     {
70       ProducerRecord<String, ChatRoomTo> record =
71           new ProducerRecord<>(
72               topic,
73               chatRoomId.toString(),
74               chatRoomTo);
75
76       producer.send(record, ((metadata, exception) ->
77       {
78         if (metadata != null)
79         {
80           log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
81           sink.success(chatRoomTo.toChatRoomInfo());
82         }
83         else
84         {
85           // On send-failure
86           log.error(
87               "Could not send create-request for chat room (id={}, name={}): {}",
88               chatRoomId,
89               name,
90               exception);
91           sink.error(exception);
92         }
93       }));
94     });
95   }
96 }