refactor: Moved exceptions into package `exceptions` - Aligned Code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
8 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
9 import lombok.Getter;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.producer.Producer;
16 import org.apache.kafka.clients.producer.ProducerRecord;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.errors.WakeupException;
19 import reactor.core.publisher.Flux;
20 import reactor.core.publisher.Mono;
21
22 import java.time.*;
23 import java.util.*;
24 import java.util.stream.IntStream;
25
26
27 @Slf4j
28 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
29 {
30   private final String topic;
31   private final Producer<String, AbstractMessageTo> producer;
32   private final Consumer<String, AbstractMessageTo> consumer;
33   private final ZoneId zoneId;
34   private final int numShards;
35   private final int bufferSize;
36   private final Clock clock;
37   private final boolean[] isShardOwned;
38   private final long[] currentOffset;
39   private final long[] nextOffset;
40   private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
41   private final Map<UUID, ChatRoomData>[] chatRoomData;
42
43   private boolean running;
44   @Getter
45   private volatile boolean loadInProgress;
46
47
48   public ChatRoomChannel(
49     String topic,
50     Producer<String, AbstractMessageTo> producer,
51     Consumer<String, AbstractMessageTo> consumer,
52     ZoneId zoneId,
53     int numShards,
54     int bufferSize,
55     Clock clock)
56   {
57     log.debug(
58         "Creating ChatRoomChannel for topic {} with {} partitions",
59         topic,
60         numShards);
61     this.topic = topic;
62     this.consumer = consumer;
63     this.producer = producer;
64     this.zoneId = zoneId;
65     this.numShards = numShards;
66     this.bufferSize = bufferSize;
67     this.clock = clock;
68     this.isShardOwned = new boolean[numShards];
69     this.currentOffset = new long[numShards];
70     this.nextOffset = new long[numShards];
71     this.chatRoomInfo = new Map[numShards];
72     this.chatRoomData = new Map[numShards];
73     IntStream
74         .range(0, numShards)
75         .forEach(shard ->
76         {
77           this.chatRoomInfo[shard] = new HashMap<>();
78           this.chatRoomData[shard] = new HashMap<>();
79         });
80   }
81
82
83
84   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
85       UUID chatRoomId,
86       String name)
87   {
88     CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
89     return Mono.create(sink ->
90     {
91       ProducerRecord<String, AbstractMessageTo> record =
92           new ProducerRecord<>(
93               topic,
94               chatRoomId.toString(),
95               createChatRoomRequestTo);
96
97       producer.send(record, ((metadata, exception) ->
98       {
99         if (metadata != null)
100         {
101           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
102           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
103           createChatRoom(chatRoomInfo);
104           sink.success(chatRoomInfo);
105         }
106         else
107         {
108           // On send-failure
109           log.error(
110               "Could not send create-request for chat room (id={}, name={}): {}",
111               chatRoomId,
112               name,
113               exception);
114           sink.error(exception);
115         }
116       }));
117     });
118   }
119
120   Mono<Message> sendChatMessage(
121       UUID chatRoomId,
122       Message.MessageKey key,
123       LocalDateTime timestamp,
124       String text)
125   {
126     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
127     return Mono.create(sink ->
128     {
129       ProducerRecord<String, AbstractMessageTo> record =
130           new ProducerRecord<>(
131               topic,
132               null,
133               zdt.toEpochSecond(),
134               chatRoomId.toString(),
135               EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
136
137       producer.send(record, ((metadata, exception) ->
138       {
139         if (metadata != null)
140         {
141           // On successful send
142           Message message = new Message(key, metadata.offset(), timestamp, text);
143           log.info("Successfully send message {}", message);
144           sink.success(message);
145         }
146         else
147         {
148           // On send-failure
149           log.error(
150               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
151               chatRoomId,
152               key,
153               timestamp,
154               text,
155               exception);
156           sink.error(exception);
157         }
158       }));
159     });
160   }
161
162   @Override
163   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
164   {
165     log.info("Newly assigned partitions! Pausing normal operations...");
166     loadInProgress = true;
167
168     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
169     {
170       int partition = topicPartition.partition();
171       isShardOwned[partition] =  true;
172       this.currentOffset[partition] = currentOffset;
173
174       log.info(
175           "Partition assigned: {} - loading messages: next={} -> current={}",
176           partition,
177           nextOffset[partition],
178           currentOffset);
179
180       consumer.seek(topicPartition, nextOffset[partition]);
181     });
182
183     consumer.resume(partitions);
184   }
185
186   @Override
187   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
188   {
189     partitions.forEach(topicPartition ->
190     {
191       int partition = topicPartition.partition();
192       isShardOwned[partition] = false;
193       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
194     });
195   }
196
197   @Override
198   public void onPartitionsLost(Collection<TopicPartition> partitions)
199   {
200     log.warn("Lost partitions: {}, partitions");
201     // TODO: Muss auf den Verlust anders reagiert werden?
202     onPartitionsRevoked(partitions);
203   }
204
205   @Override
206   public void run()
207   {
208     running = true;
209
210     while (running)
211     {
212       try
213       {
214         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
215         log.info("Fetched {} messages", records.count());
216
217         if (loadInProgress)
218         {
219           loadChatRoom(records);
220
221           if (isLoadingCompleted())
222           {
223             log.info("Loading of messages completed! Pausing all owned partitions...");
224             pauseAllOwnedPartions();
225             log.info("Resuming normal operations...");
226             loadInProgress = false;
227           }
228         }
229         else
230         {
231           if (!records.isEmpty())
232           {
233             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
234           }
235         }
236       }
237       catch (WakeupException e)
238       {
239         log.info("Received WakeupException, exiting!");
240         running = false;
241       }
242     }
243
244     log.info("Exiting normally");
245   }
246
247   private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
248   {
249     for (ConsumerRecord<String, AbstractMessageTo> record : records)
250     {
251       UUID chatRoomId = UUID.fromString(record.key());
252
253       switch (record.value().getType())
254       {
255         case COMMAND_CREATE_CHATROOM:
256           createChatRoom(
257               chatRoomId,
258               (CommandCreateChatRoomTo) record.value(),
259               record.partition());
260           break;
261
262         case EVENT_CHATMESSAGE_RECEIVED:
263           Instant instant = Instant.ofEpochSecond(record.timestamp());
264           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
265           loadChatMessage(
266               chatRoomId,
267               timestamp,
268               record.offset(),
269               (EventChatMessageReceivedTo) record.value(),
270               record.partition());
271           break;
272
273         default:
274           log.debug(
275               "Ignoring message for chat-room {} with offset {}: {}",
276               chatRoomId,
277               record.offset(),
278               record.value());
279       }
280
281       nextOffset[record.partition()] = record.offset() + 1;
282     }
283   }
284
285   private void createChatRoom(
286       UUID chatRoomId,
287       CommandCreateChatRoomTo createChatRoomRequestTo,
288       Integer partition)
289   {
290     log.info(
291         "Loading ChatRoom {} for shard {} with buffer-size {}",
292         chatRoomId,
293         partition,
294         bufferSize);
295     KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
296     ChatRoomData chatRoomData = new ChatRoomData(
297         clock,
298         service,
299         bufferSize);
300     putChatRoom(
301         chatRoomId,
302         createChatRoomRequestTo.getName(),
303         partition,
304         chatRoomData);
305   }
306
307
308   private void createChatRoom(ChatRoomInfo chatRoomInfo)
309   {
310     UUID id = chatRoomInfo.getId();
311     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
312     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
313     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
314     putChatRoom(
315         chatRoomInfo.getId(),
316         chatRoomInfo.getName(),
317         chatRoomInfo.getShard(),
318         chatRoomData);
319   }
320
321   private void loadChatMessage(
322       UUID chatRoomId,
323       LocalDateTime timestamp,
324       long offset,
325       EventChatMessageReceivedTo chatMessageTo,
326       int partition)
327   {
328     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
329     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
330
331     ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
332     KafkaChatRoomService kafkaChatRoomService =
333         (KafkaChatRoomService) chatRoomData.getChatRoomService();
334
335     kafkaChatRoomService.persistMessage(message);
336   }
337
338   private boolean isLoadingCompleted()
339   {
340     return IntStream
341         .range(0, numShards)
342         .filter(shard -> isShardOwned[shard])
343         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
344   }
345
346   private void pauseAllOwnedPartions()
347   {
348     consumer.pause(IntStream
349         .range(0, numShards)
350         .filter(shard -> isShardOwned[shard])
351         .mapToObj(shard -> new TopicPartition(topic, shard))
352         .toList());
353   }
354
355
356   private void putChatRoom(
357       UUID chatRoomId,
358       String name,
359       Integer partition,
360       ChatRoomData chatRoomData)
361   {
362     if (this.chatRoomInfo[partition].containsKey(chatRoomId))
363     {
364       log.warn(
365           "Ignoring existing chat-room for {}: {}",
366           partition,
367           chatRoomId);
368     }
369     else
370     {
371       log.info(
372           "Adding new chat-room to partition {}: {}",
373           partition,
374           chatRoomData);
375
376       this.chatRoomInfo[partition].put(
377           chatRoomId,
378           new ChatRoomInfo(chatRoomId, name, partition));
379       this.chatRoomData[partition].put(chatRoomId, chatRoomData);
380     }
381   }
382
383   int[] getOwnedShards()
384   {
385     return IntStream
386         .range(0, numShards)
387         .filter(shard -> isShardOwned[shard])
388         .toArray();
389   }
390
391   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
392   {
393     if (loadInProgress)
394     {
395       return Mono.error(new LoadInProgressException());
396     }
397
398     if (!isShardOwned[shard])
399     {
400       return Mono.error(new ShardNotOwnedException(shard));
401     }
402
403     return Mono.justOrEmpty(chatRoomData[shard].get(id));
404   }
405
406   Flux<ChatRoomInfo> getChatRoomInfo()
407   {
408     return Flux
409         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
410         .filter(shard -> isShardOwned[shard])
411         .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
412   }
413
414   Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
415   {
416     if (loadInProgress)
417     {
418       return Mono.error(new LoadInProgressException());
419     }
420
421     if (!isShardOwned[shard])
422     {
423       return Mono.error(new ShardNotOwnedException(shard));
424     }
425
426     return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
427   }
428
429   Flux<ChatRoomData> getChatRoomData()
430   {
431     return Flux
432         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
433         .filter(shard -> isShardOwned[shard])
434         .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
435   }
436 }