NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Mono;
14
15 import java.time.*;
16 import java.util.List;
17 import java.util.UUID;
18
19
20 @RequiredArgsConstructor
21 @Slf4j
22 public class ChatRoomChannel implements Runnable
23 {
24   private final String topic;
25   private final Producer<Integer, ChatRoomTo> producer;
26   private final Consumer<Integer, ChatRoomTo> consumer;
27   private final ShardingStrategy shardingStrategy;
28   private final ChatMessageChannel chatMessageChannel;
29   private final Clock clock;
30   private final int bufferSize;
31
32   private boolean running;
33
34
35   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
36       UUID chatRoomId,
37       String name)
38   {
39     int shard = this.shardingStrategy.selectShard(chatRoomId);
40     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
41     return Mono.create(sink ->
42     {
43       ProducerRecord<Integer, ChatRoomTo> record =
44           new ProducerRecord<>(
45               topic,
46               shard,
47               chatRoomTo);
48
49       producer.send(record, ((metadata, exception) ->
50       {
51         if (metadata != null)
52         {
53           log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
54           sink.success(chatRoomTo.toChatRoomInfo());
55         }
56         else
57         {
58           // On send-failure
59           log.error(
60               "Could not send create-request for chat room (id={}, name={}): {}",
61               chatRoomId,
62               name,
63               exception);
64           sink.error(exception);
65         }
66       }));
67     });
68   }
69
70   @Override
71   public void run()
72   {
73     consumer.assign(List.of(new TopicPartition(topic, 0)));
74
75     running = true;
76
77     while (running)
78     {
79       try
80       {
81         ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
82         log.info("Fetched {} messages", records.count());
83
84         for (ConsumerRecord<Integer, ChatRoomTo> record : records)
85         {
86           createChatRoom(record.value().toChatRoomInfo());
87         }
88       }
89       catch (WakeupException e)
90       {
91         log.info("Received WakeupException, exiting!");
92         running = false;
93       }
94     }
95
96     log.info("Exiting normally");
97   }
98
99
100   void createChatRoom(ChatRoomInfo chatRoomInfo)
101   {
102     UUID id = chatRoomInfo.getId();
103     String name = chatRoomInfo.getName();
104     int shard = chatRoomInfo.getShard();
105     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
106     KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
107     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
108     chatMessageChannel.putChatRoom(chatRoom);
109   }
110 }