NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
7 import lombok.Getter;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19
20 import java.time.*;
21 import java.util.*;
22 import java.util.stream.IntStream;
23
24
25 @Slf4j
26 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
27 {
28   private final String topic;
29   private final Producer<String, AbstractTo> producer;
30   private final Consumer<String, AbstractTo> consumer;
31   private final ZoneId zoneId;
32   private final int numShards;
33   private final boolean[] isShardOwned;
34   private final long[] currentOffset;
35   private final long[] nextOffset;
36   private final Map<UUID, ChatRoom>[] chatrooms;
37   private final KafkaLikeShardingStrategy shardingStrategy;
38
39   private boolean running;
40   @Getter
41   private volatile boolean loadInProgress;
42
43
44   public ChatMessageChannel(
45     String topic,
46     Producer<String, AbstractTo> producer,
47     Consumer<String, AbstractTo> consumer,
48     ZoneId zoneId,
49     int numShards)
50   {
51     log.debug(
52         "Creating ChatMessageChannel for topic {} with {} partitions",
53         topic,
54         numShards);
55     this.topic = topic;
56     this.consumer = consumer;
57     this.producer = producer;
58     this.zoneId = zoneId;
59     this.numShards = numShards;
60     this.isShardOwned = new boolean[numShards];
61     this.currentOffset = new long[numShards];
62     this.nextOffset = new long[numShards];
63     this.chatrooms = new Map[numShards];
64     IntStream
65         .range(0, numShards)
66         .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
67     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
68   }
69
70
71
72   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
73       UUID chatRoomId,
74       String name)
75   {
76     CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
77     return Mono.create(sink ->
78     {
79       ProducerRecord<String, CreateChatRoomRequestTo> record =
80           new ProducerRecord<>(
81               topic,
82               chatRoomId.toString(),
83               createChatRoomRequestTo);
84
85       producer.send(record, ((metadata, exception) ->
86       {
87         if (metadata != null)
88         {
89           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
90           ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
91           createChatRoom(chatRoomInfo);
92           sink.success(chatRoomInfo);
93         }
94         else
95         {
96           // On send-failure
97           log.error(
98               "Could not send create-request for chat room (id={}, name={}): {}",
99               chatRoomId,
100               name,
101               exception);
102           sink.error(exception);
103         }
104       }));
105     });
106   }
107
108   Mono<Message> sendChatMessage(
109       UUID chatRoomId,
110       Message.MessageKey key,
111       LocalDateTime timestamp,
112       String text)
113   {
114     int shard = this.shardingStrategy.selectShard(chatRoomId);
115     TopicPartition tp = new TopicPartition(topic, shard);
116     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
117     return Mono.create(sink ->
118     {
119       ProducerRecord<String, ChatMessageTo> record =
120           new ProducerRecord<>(
121               tp.topic(),
122               tp.partition(),
123               zdt.toEpochSecond(),
124               chatRoomId.toString(),
125               ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
126
127       producer.send(record, ((metadata, exception) ->
128       {
129         if (metadata != null)
130         {
131           // On successful send
132           Message message = new Message(key, metadata.offset(), timestamp, text);
133           log.info("Successfully send message {}", message);
134           sink.success(message);
135         }
136         else
137         {
138           // On send-failure
139           log.error(
140               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
141               chatRoomId,
142               key,
143               timestamp,
144               text,
145               exception);
146           sink.error(exception);
147         }
148       }));
149     });
150   }
151
152   @Override
153   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
154   {
155     log.info("Newly assigned partitions! Pausing normal operations...");
156     loadInProgress = true;
157
158     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
159     {
160       int partition = topicPartition.partition();
161       isShardOwned[partition] =  true;
162       this.currentOffset[partition] = currentOffset;
163
164       log.info(
165           "Partition assigned: {} - loading messages: next={} -> current={}",
166           partition,
167           nextOffset[partition],
168           currentOffset);
169
170       consumer.seek(topicPartition, nextOffset[partition]);
171     });
172
173     consumer.resume(partitions);
174   }
175
176   @Override
177   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
178   {
179     partitions.forEach(topicPartition ->
180     {
181       int partition = topicPartition.partition();
182       isShardOwned[partition] = false;
183       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
184     });
185   }
186
187   @Override
188   public void onPartitionsLost(Collection<TopicPartition> partitions)
189   {
190     log.warn("Lost partitions: {}, partitions");
191     // TODO: Muss auf den Verlust anders reagiert werden?
192     onPartitionsRevoked(partitions);
193   }
194
195   @Override
196   public void run()
197   {
198     consumer.subscribe(List.of(topic), this);
199
200     running = true;
201
202     while (running)
203     {
204       try
205       {
206         ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
207         log.info("Fetched {} messages", records.count());
208
209         if (loadInProgress)
210         {
211           loadChatRoom(records);
212
213           if (isLoadingCompleted())
214           {
215             log.info("Loading of messages completed! Pausing all owned partitions...");
216             pauseAllOwnedPartions();
217             log.info("Resuming normal operations...");
218             loadInProgress = false;
219           }
220         }
221         else
222         {
223           if (!records.isEmpty())
224           {
225             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
226           }
227         }
228       }
229       catch (WakeupException e)
230       {
231         log.info("Received WakeupException, exiting!");
232         running = false;
233       }
234     }
235
236     log.info("Exiting normally");
237   }
238
239   void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
240   {
241     for (ConsumerRecord<String, AbstractTo> record : records)
242     {
243       UUID chatRoomId = UUID.fromString(record.key());
244
245       switch (record.value().getType())
246       {
247         case CREATE_CHATROOM_REQUEST:
248           createChatRoom(
249               chatRoomId,
250               (CreateChatRoomRequestTo) record.value(),
251               record.partition());
252           break;
253
254         case MESSAGE_SENT:
255           Instant instant = Instant.ofEpochSecond(record.timestamp());
256           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
257           loadChatMessage(
258               chatRoomId,
259               timestamp,
260               record.offset(),
261               (ChatMessageTo) record.value(),
262               record.partition());
263           break;
264       }
265
266       nextOffset[record.partition()] = record.offset() + 1;
267     }
268   }
269
270   void createChatRoom(
271       UUID chatRoomId,
272       CreateChatRoomRequestTo createChatRoomRequestTo,
273       int partition)
274   {
275     putChatRoom(ChatRoomInfo.of(
276         chatRoomId,
277         createChatRoomRequestTo.getName(),
278         partition));
279   }
280
281
282   void createChatRoom(ChatRoomInfo chatRoomInfo)
283   {
284     UUID id = chatRoomInfo.getId();
285     String name = chatRoomInfo.getName();
286     int shard = chatRoomInfo.getShard();
287     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
288     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
289     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
290     putChatRoom(chatRoom);
291   }
292
293   void loadChatMessage(
294       UUID chatRoomId,
295       LocalDateTime timestamp,
296       long offset,
297       ChatMessageTo chatMessageTo,
298       int partition)
299   {
300     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
301     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
302
303     ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
304     KafkaChatRoomService kafkaChatRoomService =
305         (KafkaChatRoomService) chatRoom.getChatRoomService();
306
307     kafkaChatRoomService.persistMessage(message);
308   }
309
310   boolean isLoadingCompleted()
311   {
312     return IntStream
313         .range(0, numShards)
314         .filter(shard -> isShardOwned[shard])
315         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
316   }
317
318   void pauseAllOwnedPartions()
319   {
320     consumer.pause(IntStream
321         .range(0, numShards)
322         .filter(shard -> isShardOwned[shard])
323         .mapToObj(shard -> new TopicPartition(topic, shard))
324         .toList());
325   }
326
327
328   private void putChatRoom(ChatRoom chatRoom)
329   {
330     Integer partition = chatRoom.getShard();
331     UUID chatRoomId = chatRoom.getId();
332     if (chatrooms[partition].containsKey(chatRoomId))
333     {
334       log.warn("Ignoring existing chat-room: " + chatRoom);
335     }
336     else
337     {
338       log.info(
339           "Adding new chat-room to partition {}: {}",
340           partition,
341           chatRoom);
342
343       chatrooms[partition].put(chatRoomId, chatRoom);
344     }
345   }
346
347   Mono<ChatRoom> getChatRoom(int shard, UUID id)
348   {
349     return Mono.justOrEmpty(chatrooms[shard].get(id));
350   }
351
352   Flux<ChatRoom> getChatRooms()
353   {
354     return Flux.fromStream(IntStream
355         .range(0, numShards)
356         .filter(shard -> isShardOwned[shard])
357         .mapToObj(shard -> Integer.valueOf(shard))
358         .flatMap(shard -> chatrooms[shard].values().stream()));
359   }
360 }