0c553e69fce5847cb0e95e70b288004b40b82b0d
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.producer.Producer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.apache.kafka.common.TopicPartition;
13 import org.apache.kafka.common.errors.RecordDeserializationException;
14 import org.apache.kafka.common.errors.WakeupException;
15 import reactor.core.publisher.Flux;
16 import reactor.core.publisher.Mono;
17
18 import java.time.*;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.UUID;
23 import java.util.stream.IntStream;
24
25
26 @Slf4j
27 public class ChatRoomChannel implements Runnable
28 {
29   private final String topic;
30   private final Consumer<Integer, ChatRoomTo> consumer;
31   private final Producer<Integer, ChatRoomTo> producer;
32   private final ShardingStrategy shardingStrategy;
33   private final ChatMessageChannel chatMessageChannel;
34
35   private boolean running;
36
37
38   public ChatRoomChannel(
39     String topic,
40     Consumer<Integer, ChatRoomTo> consumer,
41     Producer<Integer, ChatRoomTo> producer,
42     int numShards,
43     ChatMessageChannel chatMessageChannel)
44   {
45     log.debug(
46         "Creating ChatRoomChannel for topic {} with sharding for {} partitions",
47         topic,
48         numShards);
49     this.topic = topic;
50     this.consumer = consumer;
51     this.producer = producer;
52     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
53     this.chatMessageChannel = chatMessageChannel;
54   }
55
56
57   @Override
58   public void run()
59   {
60     consumer.assign(List.of(new TopicPartition(topic, 0)));
61
62     running = true;
63
64     while (running)
65     {
66       try
67       {
68         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
69         log.info("Fetched {} messages", records.count());
70
71       }
72       catch (WakeupException e)
73       {
74       }
75       catch (RecordDeserializationException e)
76       {
77       }
78     }
79   }
80
81
82   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
83       UUID chatRoomId,
84       String name)
85   {
86     int shard = this.shardingStrategy.selectShard(chatRoomId);
87     ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
88     return Mono.create(sink ->
89     {
90       ProducerRecord<Integer, ChatRoomTo> record =
91           new ProducerRecord<>(
92               topic,
93               shard,
94               chatRoomTo);
95
96       producer.send(record, ((metadata, exception) ->
97       {
98         if (metadata != null)
99         {
100           log.info("Successfully send chreate-request {}", chatRoomTo);
101           sink.success(chatRoomTo.toChatRoomInfo());
102         }
103         else
104         {
105           // On send-failure
106           log.error(
107               "Could not create-request for chat-room={}, key={}, timestamp={}, text={}: {}",
108               chatRoomId,
109               key,
110               timestamp,
111               text,
112               exception);
113           sink.error(exception);
114         }
115       }));
116     });
117   }
118 }