8bbc82ec1befd788d1c9c65a620aeabdf8fc1082
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Mono;
14
15 import java.time.*;
16 import java.util.List;
17 import java.util.UUID;
18
19
20 @RequiredArgsConstructor
21 @Slf4j
22 public class ChatRoomChannel implements Runnable
23 {
24   private final String topic;
25   private final Producer<Integer, CreateChatRoomRequestTo> producer;
26   private final Consumer<Integer, CreateChatRoomRequestTo> consumer;
27   private final ShardingStrategy shardingStrategy;
28   private final ChatMessageChannel chatMessageChannel;
29   private final Clock clock;
30   private final int bufferSize;
31
32   private boolean running;
33
34
35   @Override
36   public void run()
37   {
38     consumer.assign(List.of(new TopicPartition(topic, 0)));
39
40     running = true;
41
42     while (running)
43     {
44       try
45       {
46         ConsumerRecords<Integer, CreateChatRoomRequestTo> records = consumer.poll(Duration.ofMinutes(5));
47         log.info("Fetched {} messages", records.count());
48
49         for (ConsumerRecord<Integer, CreateChatRoomRequestTo> record : records)
50         {
51           createChatRoom(record.value().toChatRoomInfo());
52         }
53       }
54       catch (WakeupException e)
55       {
56         log.info("Received WakeupException, exiting!");
57         running = false;
58       }
59     }
60
61     log.info("Exiting normally");
62   }
63
64
65   void createChatRoom(ChatRoomInfo chatRoomInfo)
66   {
67     UUID id = chatRoomInfo.getId();
68     String name = chatRoomInfo.getName();
69     int shard = chatRoomInfo.getShard();
70     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
71     KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
72     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
73     chatMessageChannel.putChatRoom(chatRoom);
74   }
75 }