9ea23b13126afa753fc4290e54e7aa6ef7fc40aa
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Mono;
14
15 import java.time.*;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.UUID;
19 import java.util.concurrent.Callable;
20
21
22 @RequiredArgsConstructor
23 @Slf4j
24 public class ChatRoomChannel implements Callable<Optional<Exception>>
25 {
26   private final String topic;
27   private final Producer<Integer, ChatRoomTo> producer;
28   private final Consumer<Integer, ChatRoomTo> consumer;
29   private final ShardingStrategy shardingStrategy;
30   private final ChatMessageChannel chatMessageChannel;
31   private final Clock clock;
32   private final int bufferSize;
33
34   private boolean running;
35
36
37   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
38       UUID chatRoomId,
39       String name)
40   {
41     int shard = this.shardingStrategy.selectShard(chatRoomId);
42     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
43     return Mono.create(sink ->
44     {
45       ProducerRecord<Integer, ChatRoomTo> record =
46           new ProducerRecord<>(
47               topic,
48               shard,
49               chatRoomTo);
50
51       producer.send(record, ((metadata, exception) ->
52       {
53         if (metadata != null)
54         {
55           log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
56           sink.success(chatRoomTo.toChatRoomInfo());
57         }
58         else
59         {
60           // On send-failure
61           log.error(
62               "Could not send create-request for chat room (id={}, name={}): {}",
63               chatRoomId,
64               name,
65               exception);
66           sink.error(exception);
67         }
68       }));
69     });
70   }
71
72   @Override
73   public Optional<Exception> call()
74   {
75     consumer.assign(List.of(new TopicPartition(topic, 0)));
76
77     running = true;
78
79     while (running)
80     {
81       try
82       {
83         ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
84         log.info("Fetched {} messages", records.count());
85
86         for (ConsumerRecord<Integer, ChatRoomTo> record : records)
87         {
88           createChatRoom(record.value().toChatRoomInfo());
89         }
90       }
91       catch (WakeupException e)
92       {
93         log.info("Received WakeupException, exiting!");
94         running = false;
95       }
96       catch (Exception e)
97       {
98         log.error("Exiting abnormally!");
99         return Optional.of(e);
100       }
101     }
102
103     log.info("Exiting normally");
104     return Optional.empty();
105   }
106
107
108   void createChatRoom(ChatRoomInfo chatRoomInfo)
109   {
110     UUID id = chatRoomInfo.getId();
111     String name = chatRoomInfo.getName();
112     int shard = chatRoomInfo.getShard();
113     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
114     KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
115     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
116     chatMessageChannel.putChatRoom(chatRoom);
117   }
118 }