45e93ccc3f17f17c031f536967f79b8c35702d99
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
7 import lombok.Getter;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19
20 import java.time.*;
21 import java.util.*;
22 import java.util.stream.IntStream;
23
24
25 @Slf4j
26 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
27 {
28   private final String topic;
29   private final Producer<String, AbstractMessageTo> producer;
30   private final Consumer<String, AbstractMessageTo> consumer;
31   private final ZoneId zoneId;
32   private final int numShards;
33   private final int bufferSize;
34   private final Clock clock;
35   private final boolean[] isShardOwned;
36   private final long[] currentOffset;
37   private final long[] nextOffset;
38   private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
39   private final Map<UUID, ChatRoomData>[] chatRoomData;
40
41   private boolean running;
42   @Getter
43   private volatile boolean loadInProgress;
44
45
46   public ChatRoomChannel(
47     String topic,
48     Producer<String, AbstractMessageTo> producer,
49     Consumer<String, AbstractMessageTo> consumer,
50     ZoneId zoneId,
51     int numShards,
52     int bufferSize,
53     Clock clock)
54   {
55     log.debug(
56         "Creating ChatRoomChannel for topic {} with {} partitions",
57         topic,
58         numShards);
59     this.topic = topic;
60     this.consumer = consumer;
61     this.producer = producer;
62     this.zoneId = zoneId;
63     this.numShards = numShards;
64     this.bufferSize = bufferSize;
65     this.clock = clock;
66     this.isShardOwned = new boolean[numShards];
67     this.currentOffset = new long[numShards];
68     this.nextOffset = new long[numShards];
69     this.chatRoomInfo = new Map[numShards];
70     this.chatRoomData = new Map[numShards];
71     IntStream
72         .range(0, numShards)
73         .forEach(shard ->
74         {
75           this.chatRoomInfo[shard] = new HashMap<>();
76           this.chatRoomData[shard] = new HashMap<>();
77         });
78   }
79
80
81
82   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
83       UUID chatRoomId,
84       String name)
85   {
86     CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
87     return Mono.create(sink ->
88     {
89       ProducerRecord<String, AbstractMessageTo> record =
90           new ProducerRecord<>(
91               topic,
92               chatRoomId.toString(),
93               createChatRoomRequestTo);
94
95       producer.send(record, ((metadata, exception) ->
96       {
97         if (metadata != null)
98         {
99           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
100           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
101           createChatRoom(chatRoomInfo);
102           sink.success(chatRoomInfo);
103         }
104         else
105         {
106           // On send-failure
107           log.error(
108               "Could not send create-request for chat room (id={}, name={}): {}",
109               chatRoomId,
110               name,
111               exception);
112           sink.error(exception);
113         }
114       }));
115     });
116   }
117
118   Mono<Message> sendChatMessage(
119       UUID chatRoomId,
120       Message.MessageKey key,
121       LocalDateTime timestamp,
122       String text)
123   {
124     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
125     return Mono.create(sink ->
126     {
127       ProducerRecord<String, AbstractMessageTo> record =
128           new ProducerRecord<>(
129               topic,
130               null,
131               zdt.toEpochSecond(),
132               chatRoomId.toString(),
133               EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
134
135       producer.send(record, ((metadata, exception) ->
136       {
137         if (metadata != null)
138         {
139           // On successful send
140           Message message = new Message(key, metadata.offset(), timestamp, text);
141           log.info("Successfully send message {}", message);
142           sink.success(message);
143         }
144         else
145         {
146           // On send-failure
147           log.error(
148               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
149               chatRoomId,
150               key,
151               timestamp,
152               text,
153               exception);
154           sink.error(exception);
155         }
156       }));
157     });
158   }
159
160   @Override
161   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
162   {
163     log.info("Newly assigned partitions! Pausing normal operations...");
164     loadInProgress = true;
165
166     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
167     {
168       int partition = topicPartition.partition();
169       isShardOwned[partition] =  true;
170       this.currentOffset[partition] = currentOffset;
171
172       log.info(
173           "Partition assigned: {} - loading messages: next={} -> current={}",
174           partition,
175           nextOffset[partition],
176           currentOffset);
177
178       consumer.seek(topicPartition, nextOffset[partition]);
179     });
180
181     consumer.resume(partitions);
182   }
183
184   @Override
185   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
186   {
187     partitions.forEach(topicPartition ->
188     {
189       int partition = topicPartition.partition();
190       isShardOwned[partition] = false;
191       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
192     });
193   }
194
195   @Override
196   public void onPartitionsLost(Collection<TopicPartition> partitions)
197   {
198     log.warn("Lost partitions: {}, partitions");
199     // TODO: Muss auf den Verlust anders reagiert werden?
200     onPartitionsRevoked(partitions);
201   }
202
203   @Override
204   public void run()
205   {
206     running = true;
207
208     while (running)
209     {
210       try
211       {
212         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
213         log.info("Fetched {} messages", records.count());
214
215         if (loadInProgress)
216         {
217           loadChatRoom(records);
218
219           if (isLoadingCompleted())
220           {
221             log.info("Loading of messages completed! Pausing all owned partitions...");
222             pauseAllOwnedPartions();
223             log.info("Resuming normal operations...");
224             loadInProgress = false;
225           }
226         }
227         else
228         {
229           if (!records.isEmpty())
230           {
231             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
232           }
233         }
234       }
235       catch (WakeupException e)
236       {
237         log.info("Received WakeupException, exiting!");
238         running = false;
239       }
240     }
241
242     log.info("Exiting normally");
243   }
244
245   private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
246   {
247     for (ConsumerRecord<String, AbstractMessageTo> record : records)
248     {
249       UUID chatRoomId = UUID.fromString(record.key());
250
251       switch (record.value().getType())
252       {
253         case COMMAND_CREATE_CHATROOM:
254           createChatRoom(
255               chatRoomId,
256               (CommandCreateChatRoomTo) record.value(),
257               record.partition());
258           break;
259
260         case EVENT_CHATMESSAGE_RECEIVED:
261           Instant instant = Instant.ofEpochSecond(record.timestamp());
262           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
263           loadChatMessage(
264               chatRoomId,
265               timestamp,
266               record.offset(),
267               (EventChatMessageReceivedTo) record.value(),
268               record.partition());
269           break;
270
271         default:
272           log.debug(
273               "Ignoring message for chat-room {} with offset {}: {}",
274               chatRoomId,
275               record.offset(),
276               record.value());
277       }
278
279       nextOffset[record.partition()] = record.offset() + 1;
280     }
281   }
282
283   private void createChatRoom(
284       UUID chatRoomId,
285       CommandCreateChatRoomTo createChatRoomRequestTo,
286       Integer partition)
287   {
288     log.info(
289         "Loading ChatRoom {} for shard {} with buffer-size {}",
290         chatRoomId,
291         partition,
292         bufferSize);
293     KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
294     ChatRoomData chatRoomData = new ChatRoomData(
295         clock,
296         service,
297         bufferSize);
298     putChatRoom(
299         chatRoomId,
300         createChatRoomRequestTo.getName(),
301         partition,
302         chatRoomData);
303   }
304
305
306   private void createChatRoom(ChatRoomInfo chatRoomInfo)
307   {
308     UUID id = chatRoomInfo.getId();
309     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
310     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
311     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
312     putChatRoom(
313         chatRoomInfo.getId(),
314         chatRoomInfo.getName(),
315         chatRoomInfo.getShard(),
316         chatRoomData);
317   }
318
319   private void loadChatMessage(
320       UUID chatRoomId,
321       LocalDateTime timestamp,
322       long offset,
323       EventChatMessageReceivedTo chatMessageTo,
324       int partition)
325   {
326     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
327     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
328
329     ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
330     KafkaChatRoomService kafkaChatRoomService =
331         (KafkaChatRoomService) chatRoomData.getChatRoomService();
332
333     kafkaChatRoomService.persistMessage(message);
334   }
335
336   private boolean isLoadingCompleted()
337   {
338     return IntStream
339         .range(0, numShards)
340         .filter(shard -> isShardOwned[shard])
341         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
342   }
343
344   private void pauseAllOwnedPartions()
345   {
346     consumer.pause(IntStream
347         .range(0, numShards)
348         .filter(shard -> isShardOwned[shard])
349         .mapToObj(shard -> new TopicPartition(topic, shard))
350         .toList());
351   }
352
353
354   private void putChatRoom(
355       UUID chatRoomId,
356       String name,
357       Integer partition,
358       ChatRoomData chatRoomData)
359   {
360     if (this.chatRoomInfo[partition].containsKey(chatRoomId))
361     {
362       log.warn(
363           "Ignoring existing chat-room for {}: {}",
364           partition,
365           chatRoomId);
366     }
367     else
368     {
369       log.info(
370           "Adding new chat-room to partition {}: {}",
371           partition,
372           chatRoomData);
373
374       this.chatRoomInfo[partition].put(
375           chatRoomId,
376           new ChatRoomInfo(chatRoomId, name, partition));
377       this.chatRoomData[partition].put(chatRoomId, chatRoomData);
378     }
379   }
380
381   int[] getOwnedShards()
382   {
383     return IntStream
384         .range(0, numShards)
385         .filter(shard -> isShardOwned[shard])
386         .toArray();
387   }
388
389   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
390   {
391     if (loadInProgress)
392     {
393       return Mono.error(new LoadInProgressException());
394     }
395
396     if (!isShardOwned[shard])
397     {
398       return Mono.error(new ShardNotOwnedException(shard));
399     }
400
401     return Mono.justOrEmpty(chatRoomData[shard].get(id));
402   }
403
404   Flux<ChatRoomInfo> getChatRoomInfo()
405   {
406     return Flux
407         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
408         .filter(shard -> isShardOwned[shard])
409         .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
410   }
411
412   Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
413   {
414     if (loadInProgress)
415     {
416       return Mono.error(new LoadInProgressException());
417     }
418
419     if (!isShardOwned[shard])
420     {
421       return Mono.error(new ShardNotOwnedException(shard));
422     }
423
424     return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
425   }
426
427   Flux<ChatRoomData> getChatRoomData()
428   {
429     return Flux
430         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
431         .filter(shard -> isShardOwned[shard])
432         .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
433   }
434 }