refactor: `ShardNotOwnedException` should be send over `Mono.send()`
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
7 import lombok.Getter;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19
20 import java.time.*;
21 import java.util.*;
22 import java.util.stream.IntStream;
23
24
25 @Slf4j
26 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
27 {
28   private final String topic;
29   private final Producer<String, AbstractMessageTo> producer;
30   private final Consumer<String, AbstractMessageTo> consumer;
31   private final ZoneId zoneId;
32   private final int numShards;
33   private final int bufferSize;
34   private final Clock clock;
35   private final boolean[] isShardOwned;
36   private final long[] currentOffset;
37   private final long[] nextOffset;
38   private final Map<UUID, ChatRoom>[] chatrooms;
39
40   private boolean running;
41   @Getter
42   private volatile boolean loadInProgress;
43
44
45   public ChatRoomChannel(
46     String topic,
47     Producer<String, AbstractMessageTo> producer,
48     Consumer<String, AbstractMessageTo> consumer,
49     ZoneId zoneId,
50     int numShards,
51     int bufferSize,
52     Clock clock)
53   {
54     log.debug(
55         "Creating ChatRoomChannel for topic {} with {} partitions",
56         topic,
57         numShards);
58     this.topic = topic;
59     this.consumer = consumer;
60     this.producer = producer;
61     this.zoneId = zoneId;
62     this.numShards = numShards;
63     this.bufferSize = bufferSize;
64     this.clock = clock;
65     this.isShardOwned = new boolean[numShards];
66     this.currentOffset = new long[numShards];
67     this.nextOffset = new long[numShards];
68     this.chatrooms = new Map[numShards];
69     IntStream
70         .range(0, numShards)
71         .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
72   }
73
74
75
76   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
77       UUID chatRoomId,
78       String name)
79   {
80     CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
81     return Mono.create(sink ->
82     {
83       ProducerRecord<String, AbstractMessageTo> record =
84           new ProducerRecord<>(
85               topic,
86               chatRoomId.toString(),
87               createChatRoomRequestTo);
88
89       producer.send(record, ((metadata, exception) ->
90       {
91         if (metadata != null)
92         {
93           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
94           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
95           createChatRoom(chatRoomInfo);
96           sink.success(chatRoomInfo);
97         }
98         else
99         {
100           // On send-failure
101           log.error(
102               "Could not send create-request for chat room (id={}, name={}): {}",
103               chatRoomId,
104               name,
105               exception);
106           sink.error(exception);
107         }
108       }));
109     });
110   }
111
112   Mono<Message> sendChatMessage(
113       UUID chatRoomId,
114       Message.MessageKey key,
115       LocalDateTime timestamp,
116       String text)
117   {
118     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
119     return Mono.create(sink ->
120     {
121       ProducerRecord<String, AbstractMessageTo> record =
122           new ProducerRecord<>(
123               topic,
124               null,
125               zdt.toEpochSecond(),
126               chatRoomId.toString(),
127               EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
128
129       producer.send(record, ((metadata, exception) ->
130       {
131         if (metadata != null)
132         {
133           // On successful send
134           Message message = new Message(key, metadata.offset(), timestamp, text);
135           log.info("Successfully send message {}", message);
136           sink.success(message);
137         }
138         else
139         {
140           // On send-failure
141           log.error(
142               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
143               chatRoomId,
144               key,
145               timestamp,
146               text,
147               exception);
148           sink.error(exception);
149         }
150       }));
151     });
152   }
153
154   @Override
155   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
156   {
157     log.info("Newly assigned partitions! Pausing normal operations...");
158     loadInProgress = true;
159
160     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
161     {
162       int partition = topicPartition.partition();
163       isShardOwned[partition] =  true;
164       this.currentOffset[partition] = currentOffset;
165
166       log.info(
167           "Partition assigned: {} - loading messages: next={} -> current={}",
168           partition,
169           nextOffset[partition],
170           currentOffset);
171
172       consumer.seek(topicPartition, nextOffset[partition]);
173     });
174
175     consumer.resume(partitions);
176   }
177
178   @Override
179   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
180   {
181     partitions.forEach(topicPartition ->
182     {
183       int partition = topicPartition.partition();
184       isShardOwned[partition] = false;
185       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
186     });
187   }
188
189   @Override
190   public void onPartitionsLost(Collection<TopicPartition> partitions)
191   {
192     log.warn("Lost partitions: {}, partitions");
193     // TODO: Muss auf den Verlust anders reagiert werden?
194     onPartitionsRevoked(partitions);
195   }
196
197   @Override
198   public void run()
199   {
200     running = true;
201
202     while (running)
203     {
204       try
205       {
206         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
207         log.info("Fetched {} messages", records.count());
208
209         if (loadInProgress)
210         {
211           loadChatRoom(records);
212
213           if (isLoadingCompleted())
214           {
215             log.info("Loading of messages completed! Pausing all owned partitions...");
216             pauseAllOwnedPartions();
217             log.info("Resuming normal operations...");
218             loadInProgress = false;
219           }
220         }
221         else
222         {
223           if (!records.isEmpty())
224           {
225             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
226           }
227         }
228       }
229       catch (WakeupException e)
230       {
231         log.info("Received WakeupException, exiting!");
232         running = false;
233       }
234     }
235
236     log.info("Exiting normally");
237   }
238
239   private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
240   {
241     for (ConsumerRecord<String, AbstractMessageTo> record : records)
242     {
243       UUID chatRoomId = UUID.fromString(record.key());
244
245       switch (record.value().getType())
246       {
247         case COMMAND_CREATE_CHATROOM:
248           createChatRoom(
249               chatRoomId,
250               (CommandCreateChatRoomTo) record.value(),
251               record.partition());
252           break;
253
254         case EVENT_CHATMESSAGE_RECEIVED:
255           Instant instant = Instant.ofEpochSecond(record.timestamp());
256           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
257           loadChatMessage(
258               chatRoomId,
259               timestamp,
260               record.offset(),
261               (EventChatMessageReceivedTo) record.value(),
262               record.partition());
263           break;
264
265         default:
266           log.debug(
267               "Ignoring message for chat-room {} with offset {}: {}",
268               chatRoomId,
269               record.offset(),
270               record.value());
271       }
272
273       nextOffset[record.partition()] = record.offset() + 1;
274     }
275   }
276
277   private void createChatRoom(
278       UUID chatRoomId,
279       CommandCreateChatRoomTo createChatRoomRequestTo,
280       int partition)
281   {
282     log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
283     KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
284     ChatRoom chatRoom = new ChatRoom(
285         chatRoomId,
286         createChatRoomRequestTo.getName(),
287         partition,
288         clock,
289         service,
290         bufferSize);
291     putChatRoom(chatRoom);
292   }
293
294
295   private void createChatRoom(ChatRoomInfo chatRoomInfo)
296   {
297     UUID id = chatRoomInfo.getId();
298     String name = chatRoomInfo.getName();
299     int shard = chatRoomInfo.getShard();
300     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
301     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
302     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
303     putChatRoom(chatRoom);
304   }
305
306   private void loadChatMessage(
307       UUID chatRoomId,
308       LocalDateTime timestamp,
309       long offset,
310       EventChatMessageReceivedTo chatMessageTo,
311       int partition)
312   {
313     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
314     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
315
316     ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
317     KafkaChatRoomService kafkaChatRoomService =
318         (KafkaChatRoomService) chatRoom.getChatRoomService();
319
320     kafkaChatRoomService.persistMessage(message);
321   }
322
323   private boolean isLoadingCompleted()
324   {
325     return IntStream
326         .range(0, numShards)
327         .filter(shard -> isShardOwned[shard])
328         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
329   }
330
331   private void pauseAllOwnedPartions()
332   {
333     consumer.pause(IntStream
334         .range(0, numShards)
335         .filter(shard -> isShardOwned[shard])
336         .mapToObj(shard -> new TopicPartition(topic, shard))
337         .toList());
338   }
339
340
341   private void putChatRoom(ChatRoom chatRoom)
342   {
343     Integer partition = chatRoom.getShard();
344     UUID chatRoomId = chatRoom.getId();
345     if (chatrooms[partition].containsKey(chatRoomId))
346     {
347       log.warn("Ignoring existing chat-room: " + chatRoom);
348     }
349     else
350     {
351       log.info(
352           "Adding new chat-room to partition {}: {}",
353           partition,
354           chatRoom);
355
356       chatrooms[partition].put(chatRoomId, chatRoom);
357     }
358   }
359
360   int[] getOwnedShards()
361   {
362     return IntStream
363         .range(0, numShards)
364         .filter(shard -> isShardOwned[shard])
365         .toArray();
366   }
367
368   Mono<ChatRoom> getChatRoom(int shard, UUID id)
369   {
370     if (loadInProgress)
371     {
372       return Mono.error(new LoadInProgressException());
373     }
374
375     if (!isShardOwned[shard])
376     {
377       return Mono.error(new ShardNotOwnedException(shard));
378     }
379
380     return Mono.justOrEmpty(chatrooms[shard].get(id));
381   }
382
383   Flux<ChatRoom> getChatRooms()
384   {
385     return Flux
386         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
387         .filter(shard -> isShardOwned[shard])
388         .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
389   }
390 }