fix: GREEN - Correct handling of unknown chat-rooms in the Kafka-version
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
9 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
10 import lombok.Getter;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.consumer.Consumer;
13 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.clients.producer.Producer;
17 import org.apache.kafka.clients.producer.ProducerRecord;
18 import org.apache.kafka.common.TopicPartition;
19 import org.apache.kafka.common.errors.WakeupException;
20 import reactor.core.publisher.Flux;
21 import reactor.core.publisher.Mono;
22
23 import java.time.*;
24 import java.util.*;
25 import java.util.stream.IntStream;
26
27
28 @Slf4j
29 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
30 {
31   private final String topic;
32   private final Producer<String, AbstractMessageTo> producer;
33   private final Consumer<String, AbstractMessageTo> consumer;
34   private final ZoneId zoneId;
35   private final int numShards;
36   private final int bufferSize;
37   private final Clock clock;
38   private final boolean[] isShardOwned;
39   private final long[] currentOffset;
40   private final long[] nextOffset;
41   private final Map<UUID, ChatRoom>[] chatrooms;
42
43   private boolean running;
44   @Getter
45   private volatile boolean loadInProgress;
46
47
48   public ChatRoomChannel(
49     String topic,
50     Producer<String, AbstractMessageTo> producer,
51     Consumer<String, AbstractMessageTo> consumer,
52     ZoneId zoneId,
53     int numShards,
54     int bufferSize,
55     Clock clock)
56   {
57     log.debug(
58         "Creating ChatRoomChannel for topic {} with {} partitions",
59         topic,
60         numShards);
61     this.topic = topic;
62     this.consumer = consumer;
63     this.producer = producer;
64     this.zoneId = zoneId;
65     this.numShards = numShards;
66     this.bufferSize = bufferSize;
67     this.clock = clock;
68     this.isShardOwned = new boolean[numShards];
69     this.currentOffset = new long[numShards];
70     this.nextOffset = new long[numShards];
71     this.chatrooms = new Map[numShards];
72     IntStream
73         .range(0, numShards)
74         .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
75   }
76
77
78
79   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
80       UUID chatRoomId,
81       String name)
82   {
83     CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
84     return Mono.create(sink ->
85     {
86       ProducerRecord<String, AbstractMessageTo> record =
87           new ProducerRecord<>(
88               topic,
89               chatRoomId.toString(),
90               createChatRoomRequestTo);
91
92       producer.send(record, ((metadata, exception) ->
93       {
94         if (metadata != null)
95         {
96           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
97           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
98           createChatRoom(chatRoomInfo);
99           sink.success(chatRoomInfo);
100         }
101         else
102         {
103           // On send-failure
104           log.error(
105               "Could not send create-request for chat room (id={}, name={}): {}",
106               chatRoomId,
107               name,
108               exception);
109           sink.error(exception);
110         }
111       }));
112     });
113   }
114
115   Mono<Message> sendChatMessage(
116       UUID chatRoomId,
117       Message.MessageKey key,
118       LocalDateTime timestamp,
119       String text)
120   {
121     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
122     return Mono.create(sink ->
123     {
124       ProducerRecord<String, AbstractMessageTo> record =
125           new ProducerRecord<>(
126               topic,
127               null,
128               zdt.toEpochSecond(),
129               chatRoomId.toString(),
130               EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
131
132       producer.send(record, ((metadata, exception) ->
133       {
134         if (metadata != null)
135         {
136           // On successful send
137           Message message = new Message(key, metadata.offset(), timestamp, text);
138           log.info("Successfully send message {}", message);
139           sink.success(message);
140         }
141         else
142         {
143           // On send-failure
144           log.error(
145               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
146               chatRoomId,
147               key,
148               timestamp,
149               text,
150               exception);
151           sink.error(exception);
152         }
153       }));
154     });
155   }
156
157   @Override
158   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
159   {
160     log.info("Newly assigned partitions! Pausing normal operations...");
161     loadInProgress = true;
162
163     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
164     {
165       int partition = topicPartition.partition();
166       isShardOwned[partition] =  true;
167       this.currentOffset[partition] = currentOffset;
168
169       log.info(
170           "Partition assigned: {} - loading messages: next={} -> current={}",
171           partition,
172           nextOffset[partition],
173           currentOffset);
174
175       consumer.seek(topicPartition, nextOffset[partition]);
176     });
177
178     consumer.resume(partitions);
179   }
180
181   @Override
182   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
183   {
184     partitions.forEach(topicPartition ->
185     {
186       int partition = topicPartition.partition();
187       isShardOwned[partition] = false;
188       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
189     });
190   }
191
192   @Override
193   public void onPartitionsLost(Collection<TopicPartition> partitions)
194   {
195     log.warn("Lost partitions: {}, partitions");
196     // TODO: Muss auf den Verlust anders reagiert werden?
197     onPartitionsRevoked(partitions);
198   }
199
200   @Override
201   public void run()
202   {
203     running = true;
204
205     while (running)
206     {
207       try
208       {
209         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
210         log.info("Fetched {} messages", records.count());
211
212         if (loadInProgress)
213         {
214           loadChatRoom(records);
215
216           if (isLoadingCompleted())
217           {
218             log.info("Loading of messages completed! Pausing all owned partitions...");
219             pauseAllOwnedPartions();
220             log.info("Resuming normal operations...");
221             loadInProgress = false;
222           }
223         }
224         else
225         {
226           if (!records.isEmpty())
227           {
228             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
229           }
230         }
231       }
232       catch (WakeupException e)
233       {
234         log.info("Received WakeupException, exiting!");
235         running = false;
236       }
237     }
238
239     log.info("Exiting normally");
240   }
241
242   private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
243   {
244     for (ConsumerRecord<String, AbstractMessageTo> record : records)
245     {
246       UUID chatRoomId = UUID.fromString(record.key());
247
248       switch (record.value().getType())
249       {
250         case COMMAND_CREATE_CHATROOM:
251           createChatRoom(
252               chatRoomId,
253               (CommandCreateChatRoomTo) record.value(),
254               record.partition());
255           break;
256
257         case EVENT_CHATMESSAGE_RECEIVED:
258           Instant instant = Instant.ofEpochSecond(record.timestamp());
259           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
260           loadChatMessage(
261               chatRoomId,
262               timestamp,
263               record.offset(),
264               (EventChatMessageReceivedTo) record.value(),
265               record.partition());
266           break;
267
268         default:
269           log.debug(
270               "Ignoring message for chat-room {} with offset {}: {}",
271               chatRoomId,
272               record.offset(),
273               record.value());
274       }
275
276       nextOffset[record.partition()] = record.offset() + 1;
277     }
278   }
279
280   private void createChatRoom(
281       UUID chatRoomId,
282       CommandCreateChatRoomTo createChatRoomRequestTo,
283       int partition)
284   {
285     log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
286     KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
287     ChatRoom chatRoom = new ChatRoom(
288         chatRoomId,
289         createChatRoomRequestTo.getName(),
290         partition,
291         clock,
292         service,
293         bufferSize);
294     putChatRoom(chatRoom);
295   }
296
297
298   private void createChatRoom(ChatRoomInfo chatRoomInfo)
299   {
300     UUID id = chatRoomInfo.getId();
301     String name = chatRoomInfo.getName();
302     int shard = chatRoomInfo.getShard();
303     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
304     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
305     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
306     putChatRoom(chatRoom);
307   }
308
309   private void loadChatMessage(
310       UUID chatRoomId,
311       LocalDateTime timestamp,
312       long offset,
313       EventChatMessageReceivedTo chatMessageTo,
314       int partition)
315   {
316     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
317     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
318
319     ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
320     KafkaChatRoomService kafkaChatRoomService =
321         (KafkaChatRoomService) chatRoom.getChatRoomService();
322
323     kafkaChatRoomService.persistMessage(message);
324   }
325
326   private boolean isLoadingCompleted()
327   {
328     return IntStream
329         .range(0, numShards)
330         .filter(shard -> isShardOwned[shard])
331         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
332   }
333
334   private void pauseAllOwnedPartions()
335   {
336     consumer.pause(IntStream
337         .range(0, numShards)
338         .filter(shard -> isShardOwned[shard])
339         .mapToObj(shard -> new TopicPartition(topic, shard))
340         .toList());
341   }
342
343
344   private void putChatRoom(ChatRoom chatRoom)
345   {
346     Integer partition = chatRoom.getShard();
347     UUID chatRoomId = chatRoom.getId();
348     if (chatrooms[partition].containsKey(chatRoomId))
349     {
350       log.warn("Ignoring existing chat-room: " + chatRoom);
351     }
352     else
353     {
354       log.info(
355           "Adding new chat-room to partition {}: {}",
356           partition,
357           chatRoom);
358
359       chatrooms[partition].put(chatRoomId, chatRoom);
360     }
361   }
362
363   int[] getOwnedShards()
364   {
365     return IntStream
366         .range(0, numShards)
367         .filter(shard -> isShardOwned[shard])
368         .toArray();
369   }
370
371   Mono<ChatRoom> getChatRoom(int shard, UUID id)
372   {
373     if (loadInProgress)
374     {
375       throw new LoadInProgressException(shard);
376     }
377
378     if (!isShardOwned[shard])
379     {
380       throw new ShardNotOwnedException(shard);
381     }
382
383     return Mono.justOrEmpty(chatrooms[shard].get(id));
384   }
385
386   Flux<ChatRoom> getChatRooms()
387   {
388     return Flux
389         .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
390         .filter(shard -> isShardOwned[shard])
391         .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
392   }
393 }