1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
22 import java.util.stream.IntStream;
26 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
28 private final String topic;
29 private final Producer<String, AbstractMessageTo> producer;
30 private final Consumer<String, AbstractMessageTo> consumer;
31 private final ZoneId zoneId;
32 private final int numShards;
33 private final int bufferSize;
34 private final Clock clock;
35 private final boolean[] isShardOwned;
36 private final long[] currentOffset;
37 private final long[] nextOffset;
38 private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
39 private final Map<UUID, ChatRoomData>[] chatRoomData;
41 private boolean running;
43 private volatile boolean loadInProgress;
46 public ChatRoomChannel(
48 Producer<String, AbstractMessageTo> producer,
49 Consumer<String, AbstractMessageTo> consumer,
56 "Creating ChatRoomChannel for topic {} with {} partitions",
60 this.consumer = consumer;
61 this.producer = producer;
63 this.numShards = numShards;
64 this.bufferSize = bufferSize;
66 this.isShardOwned = new boolean[numShards];
67 this.currentOffset = new long[numShards];
68 this.nextOffset = new long[numShards];
69 this.chatRoomInfo = new Map[numShards];
70 this.chatRoomData = new Map[numShards];
75 this.chatRoomInfo[shard] = new HashMap<>();
76 this.chatRoomData[shard] = new HashMap<>();
82 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
86 CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
87 return Mono.create(sink ->
89 ProducerRecord<String, AbstractMessageTo> record =
92 chatRoomId.toString(),
93 createChatRoomRequestTo);
95 producer.send(record, ((metadata, exception) ->
99 log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
100 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
101 createChatRoom(chatRoomInfo);
102 sink.success(chatRoomInfo);
108 "Could not send create-request for chat room (id={}, name={}): {}",
112 sink.error(exception);
118 Mono<Message> sendChatMessage(
120 Message.MessageKey key,
121 LocalDateTime timestamp,
124 ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
125 return Mono.create(sink ->
127 ProducerRecord<String, AbstractMessageTo> record =
128 new ProducerRecord<>(
132 chatRoomId.toString(),
133 EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
135 producer.send(record, ((metadata, exception) ->
137 if (metadata != null)
139 // On successful send
140 Message message = new Message(key, metadata.offset(), timestamp, text);
141 log.info("Successfully send message {}", message);
142 sink.success(message);
148 "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
154 sink.error(exception);
161 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
163 log.info("Newly assigned partitions! Pausing normal operations...");
164 loadInProgress = true;
166 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
168 int partition = topicPartition.partition();
169 isShardOwned[partition] = true;
170 this.currentOffset[partition] = currentOffset;
173 "Partition assigned: {} - loading messages: next={} -> current={}",
175 nextOffset[partition],
178 consumer.seek(topicPartition, nextOffset[partition]);
181 consumer.resume(partitions);
185 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
187 partitions.forEach(topicPartition ->
189 int partition = topicPartition.partition();
190 isShardOwned[partition] = false;
191 log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
196 public void onPartitionsLost(Collection<TopicPartition> partitions)
198 log.warn("Lost partitions: {}, partitions");
199 // TODO: Muss auf den Verlust anders reagiert werden?
200 onPartitionsRevoked(partitions);
212 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
213 log.info("Fetched {} messages", records.count());
217 loadChatRoom(records);
219 if (isLoadingCompleted())
221 log.info("Loading of messages completed! Pausing all owned partitions...");
222 pauseAllOwnedPartions();
223 log.info("Resuming normal operations...");
224 loadInProgress = false;
229 if (!records.isEmpty())
231 throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
235 catch (WakeupException e)
237 log.info("Received WakeupException, exiting!");
242 log.info("Exiting normally");
245 private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
247 for (ConsumerRecord<String, AbstractMessageTo> record : records)
249 UUID chatRoomId = UUID.fromString(record.key());
251 switch (record.value().getType())
253 case COMMAND_CREATE_CHATROOM:
256 (CommandCreateChatRoomTo) record.value(),
260 case EVENT_CHATMESSAGE_RECEIVED:
261 Instant instant = Instant.ofEpochSecond(record.timestamp());
262 LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
267 (EventChatMessageReceivedTo) record.value(),
273 "Ignoring message for chat-room {} with offset {}: {}",
279 nextOffset[record.partition()] = record.offset() + 1;
283 private void createChatRoom(
285 CommandCreateChatRoomTo createChatRoomRequestTo,
289 "Loading ChatRoom {} for shard {} with buffer-size {}",
293 KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
294 ChatRoomData chatRoomData = new ChatRoomData(
300 createChatRoomRequestTo.getName(),
306 private void createChatRoom(ChatRoomInfo chatRoomInfo)
308 UUID id = chatRoomInfo.getId();
309 log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
310 KafkaChatRoomService service = new KafkaChatRoomService(this, id);
311 ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
313 chatRoomInfo.getId(),
314 chatRoomInfo.getName(),
315 chatRoomInfo.getShard(),
319 private void loadChatMessage(
321 LocalDateTime timestamp,
323 EventChatMessageReceivedTo chatMessageTo,
326 Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
327 Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
329 ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
330 KafkaChatRoomService kafkaChatRoomService =
331 (KafkaChatRoomService) chatRoomData.getChatRoomService();
333 kafkaChatRoomService.persistMessage(message);
336 private boolean isLoadingCompleted()
340 .filter(shard -> isShardOwned[shard])
341 .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
344 private void pauseAllOwnedPartions()
346 consumer.pause(IntStream
348 .filter(shard -> isShardOwned[shard])
349 .mapToObj(shard -> new TopicPartition(topic, shard))
354 private void putChatRoom(
358 ChatRoomData chatRoomData)
360 if (this.chatRoomInfo[partition].containsKey(chatRoomId))
363 "Ignoring existing chat-room for {}: {}",
370 "Adding new chat-room to partition {}: {}",
374 this.chatRoomInfo[partition].put(
376 new ChatRoomInfo(chatRoomId, name, partition));
377 this.chatRoomData[partition].put(chatRoomId, chatRoomData);
381 int[] getOwnedShards()
385 .filter(shard -> isShardOwned[shard])
389 Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
393 return Mono.error(new LoadInProgressException());
396 if (!isShardOwned[shard])
398 return Mono.error(new ShardNotOwnedException(shard));
401 return Mono.justOrEmpty(chatRoomData[shard].get(id));
404 Flux<ChatRoomInfo> getChatRoomInfo()
407 .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
408 .filter(shard -> isShardOwned[shard])
409 .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
412 Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
416 return Mono.error(new LoadInProgressException());
419 if (!isShardOwned[shard])
421 return Mono.error(new ShardNotOwnedException(shard));
424 return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
427 Flux<ChatRoomData> getChatRoomData()
430 .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
431 .filter(shard -> isShardOwned[shard])
432 .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));