NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.RecordDeserializationException;
13 import org.apache.kafka.common.errors.WakeupException;
14 import reactor.core.publisher.Mono;
15
16 import java.time.*;
17 import java.util.List;
18 import java.util.UUID;
19
20
21 @RequiredArgsConstructor
22 @Slf4j
23 public class ChatRoomChannel implements Runnable
24 {
25   private final String topic;
26   private final Consumer<Integer, ChatRoomTo> consumer;
27   private final Producer<Integer, ChatRoomTo> producer;
28   private final ShardingStrategy shardingStrategy;
29   private final ChatMessageChannel chatMessageChannel;
30   private final Clock clock;
31   private final int bufferSize;
32
33   private boolean running;
34
35
36   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
37       UUID chatRoomId,
38       String name)
39   {
40     int shard = this.shardingStrategy.selectShard(chatRoomId);
41     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
42     return Mono.create(sink ->
43     {
44       ProducerRecord<Integer, ChatRoomTo> record =
45           new ProducerRecord<>(
46               topic,
47               shard,
48               chatRoomTo);
49
50       producer.send(record, ((metadata, exception) ->
51       {
52         if (metadata != null)
53         {
54           log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
55           sink.success(chatRoomTo.toChatRoomInfo());
56         }
57         else
58         {
59           // On send-failure
60           log.error(
61               "Could not send create-request for chat room (id={}, name={}): {}",
62               chatRoomId,
63               name,
64               exception);
65           sink.error(exception);
66         }
67       }));
68     });
69   }
70
71   @Override
72   public void run()
73   {
74     consumer.assign(List.of(new TopicPartition(topic, 0)));
75
76     running = true;
77
78     while (running)
79     {
80       try
81       {
82         ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
83         log.info("Fetched {} messages", records.count());
84
85         for (ConsumerRecord<Integer, ChatRoomTo> record : records)
86         {
87           createChatRoom(record.value().toChatRoomInfo());
88         }
89       }
90       catch (WakeupException e)
91       {
92         log.info("Received WakeupException, exiting!");
93         running = false;
94       }
95     }
96   }
97
98   void createChatRoom(ChatRoomInfo chatRoomInfo)
99   {
100     UUID id = chatRoomInfo.getId();
101     String name = chatRoomInfo.getName();
102     int shard = chatRoomInfo.getShard();
103     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
104     KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
105     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
106     chatMessageChannel.putChatRoom(chatRoom);
107   }
108 }