1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
6 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
8 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
13 import org.apache.kafka.clients.consumer.ConsumerRecord;
14 import org.apache.kafka.clients.consumer.ConsumerRecords;
15 import org.apache.kafka.clients.producer.Producer;
16 import org.apache.kafka.clients.producer.ProducerRecord;
17 import org.apache.kafka.common.TopicPartition;
18 import org.apache.kafka.common.errors.WakeupException;
19 import reactor.core.publisher.Flux;
20 import reactor.core.publisher.Mono;
24 import java.util.stream.IntStream;
28 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
30 private final String topic;
31 private final Producer<String, AbstractMessageTo> producer;
32 private final Consumer<String, AbstractMessageTo> consumer;
33 private final ZoneId zoneId;
34 private final int numShards;
35 private final int bufferSize;
36 private final Clock clock;
37 private final boolean[] isShardOwned;
38 private final long[] currentOffset;
39 private final long[] nextOffset;
40 private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
41 private final Map<UUID, ChatRoomData>[] chatRoomData;
43 private boolean running;
45 private volatile boolean loadInProgress;
48 public ChatRoomChannel(
50 Producer<String, AbstractMessageTo> producer,
51 Consumer<String, AbstractMessageTo> consumer,
58 "Creating ChatRoomChannel for topic {} with {} partitions",
62 this.consumer = consumer;
63 this.producer = producer;
65 this.numShards = numShards;
66 this.bufferSize = bufferSize;
68 this.isShardOwned = new boolean[numShards];
69 this.currentOffset = new long[numShards];
70 this.nextOffset = new long[numShards];
71 this.chatRoomInfo = new Map[numShards];
72 this.chatRoomData = new Map[numShards];
77 this.chatRoomInfo[shard] = new HashMap<>();
78 this.chatRoomData[shard] = new HashMap<>();
84 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
88 CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
89 return Mono.create(sink ->
91 ProducerRecord<String, AbstractMessageTo> record =
94 chatRoomId.toString(),
95 createChatRoomRequestTo);
97 producer.send(record, ((metadata, exception) ->
101 log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
102 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
103 createChatRoom(chatRoomInfo);
104 sink.success(chatRoomInfo);
110 "Could not send create-request for chat room (id={}, name={}): {}",
114 sink.error(exception);
120 Mono<Message> sendChatMessage(
122 Message.MessageKey key,
123 LocalDateTime timestamp,
126 ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
127 return Mono.create(sink ->
129 ProducerRecord<String, AbstractMessageTo> record =
130 new ProducerRecord<>(
134 chatRoomId.toString(),
135 EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
137 producer.send(record, ((metadata, exception) ->
139 if (metadata != null)
141 // On successful send
142 Message message = new Message(key, metadata.offset(), timestamp, text);
143 log.info("Successfully send message {}", message);
144 sink.success(message);
150 "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
156 sink.error(exception);
163 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
165 log.info("Newly assigned partitions! Pausing normal operations...");
166 loadInProgress = true;
168 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
170 int partition = topicPartition.partition();
171 isShardOwned[partition] = true;
172 this.currentOffset[partition] = currentOffset;
175 "Partition assigned: {} - loading messages: next={} -> current={}",
177 nextOffset[partition],
180 consumer.seek(topicPartition, nextOffset[partition]);
183 consumer.resume(partitions);
187 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
189 partitions.forEach(topicPartition ->
191 int partition = topicPartition.partition();
192 isShardOwned[partition] = false;
193 log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
198 public void onPartitionsLost(Collection<TopicPartition> partitions)
200 log.warn("Lost partitions: {}, partitions");
201 // TODO: Muss auf den Verlust anders reagiert werden?
202 onPartitionsRevoked(partitions);
214 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
215 log.info("Fetched {} messages", records.count());
219 loadChatRoom(records);
221 if (isLoadingCompleted())
223 log.info("Loading of messages completed! Pausing all owned partitions...");
224 pauseAllOwnedPartions();
225 log.info("Resuming normal operations...");
226 loadInProgress = false;
231 if (!records.isEmpty())
233 throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
237 catch (WakeupException e)
239 log.info("Received WakeupException, exiting!");
244 log.info("Exiting normally");
247 private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
249 for (ConsumerRecord<String, AbstractMessageTo> record : records)
251 UUID chatRoomId = UUID.fromString(record.key());
253 switch (record.value().getType())
255 case COMMAND_CREATE_CHATROOM:
258 (CommandCreateChatRoomTo) record.value(),
262 case EVENT_CHATMESSAGE_RECEIVED:
263 Instant instant = Instant.ofEpochSecond(record.timestamp());
264 LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
269 (EventChatMessageReceivedTo) record.value(),
275 "Ignoring message for chat-room {} with offset {}: {}",
281 nextOffset[record.partition()] = record.offset() + 1;
285 private void createChatRoom(
287 CommandCreateChatRoomTo createChatRoomRequestTo,
291 "Loading ChatRoom {} for shard {} with buffer-size {}",
295 KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
296 ChatRoomData chatRoomData = new ChatRoomData(
302 createChatRoomRequestTo.getName(),
308 private void createChatRoom(ChatRoomInfo chatRoomInfo)
310 UUID id = chatRoomInfo.getId();
311 log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
312 KafkaChatRoomService service = new KafkaChatRoomService(this, id);
313 ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
315 chatRoomInfo.getId(),
316 chatRoomInfo.getName(),
317 chatRoomInfo.getShard(),
321 private void loadChatMessage(
323 LocalDateTime timestamp,
325 EventChatMessageReceivedTo chatMessageTo,
328 Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
329 Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
331 ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
332 KafkaChatRoomService kafkaChatRoomService =
333 (KafkaChatRoomService) chatRoomData.getChatRoomService();
335 kafkaChatRoomService.persistMessage(message);
338 private boolean isLoadingCompleted()
342 .filter(shard -> isShardOwned[shard])
343 .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
346 private void pauseAllOwnedPartions()
348 consumer.pause(IntStream
350 .filter(shard -> isShardOwned[shard])
351 .mapToObj(shard -> new TopicPartition(topic, shard))
356 private void putChatRoom(
360 ChatRoomData chatRoomData)
362 if (this.chatRoomInfo[partition].containsKey(chatRoomId))
365 "Ignoring existing chat-room for {}: {}",
372 "Adding new chat-room to partition {}: {}",
376 this.chatRoomInfo[partition].put(
378 new ChatRoomInfo(chatRoomId, name, partition));
379 this.chatRoomData[partition].put(chatRoomId, chatRoomData);
383 int[] getOwnedShards()
387 .filter(shard -> isShardOwned[shard])
391 Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
395 return Mono.error(new LoadInProgressException());
398 if (!isShardOwned[shard])
400 return Mono.error(new ShardNotOwnedException(shard));
403 return Mono.justOrEmpty(chatRoomData[shard].get(id));
406 Flux<ChatRoomInfo> getChatRoomInfo()
409 .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
410 .filter(shard -> isShardOwned[shard])
411 .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
414 Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
418 return Mono.error(new LoadInProgressException());
421 if (!isShardOwned[shard])
423 return Mono.error(new ShardNotOwnedException(shard));
426 return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
429 Flux<ChatRoomData> getChatRoomData()
432 .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
433 .filter(shard -> isShardOwned[shard])
434 .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));