b08a50d2a6c78a3309e3bb347005af065ec9ff7e
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.RecordDeserializationException;
13 import org.apache.kafka.common.errors.WakeupException;
14 import reactor.core.publisher.Mono;
15
16 import java.time.*;
17 import java.util.List;
18 import java.util.UUID;
19
20
21 @RequiredArgsConstructor
22 @Slf4j
23 public class ChatRoomChannel implements Runnable
24 {
25   private final String topic;
26   private final Consumer<Integer, ChatRoomTo> consumer;
27   private final Producer<Integer, ChatRoomTo> producer;
28   private final ShardingStrategy shardingStrategy;
29   private final ChatMessageChannel chatMessageChannel;
30
31   private boolean running;
32
33
34   @Override
35   public void run()
36   {
37     consumer.assign(List.of(new TopicPartition(topic, 0)));
38
39     running = true;
40
41     while (running)
42     {
43       try
44       {
45         ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
46         log.info("Fetched {} messages", records.count());
47
48         for (ConsumerRecord<Integer, ChatRoomTo> record : records)
49         {
50           UUID id = record.value().getId();
51           String name = record.value().getName();
52           chatRoomFactory.createChatRoom(id, name);
53         }
54       }
55       catch (WakeupException e)
56       {
57       }
58       catch (RecordDeserializationException e)
59       {
60       }
61     }
62   }
63
64   void createChatRoom()
65   {
66     log.info("Creating ChatRoom with buffer-size {}", bufferSize);
67     KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
68     int shard = shardingStrategy.selectShard(id);
69     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
70     chatMessageChannel.putChatRoom(chatRoom);
71   }
72
73   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
74       UUID chatRoomId,
75       String name)
76   {
77     int shard = this.shardingStrategy.selectShard(chatRoomId);
78     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
79     return Mono.create(sink ->
80     {
81       ProducerRecord<Integer, ChatRoomTo> record =
82           new ProducerRecord<>(
83               topic,
84               shard,
85               chatRoomTo);
86
87       producer.send(record, ((metadata, exception) ->
88       {
89         if (metadata != null)
90         {
91           log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
92           sink.success(chatRoomTo.toChatRoomInfo());
93         }
94         else
95         {
96           // On send-failure
97           log.error(
98               "Could not send create-request for chat room (id={}, name={}): {}",
99               chatRoomId,
100               name,
101               exception);
102           sink.error(exception);
103         }
104       }));
105     });
106   }
107 }