1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
8 import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
9 import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.consumer.Consumer;
13 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
14 import org.apache.kafka.clients.consumer.ConsumerRecord;
15 import org.apache.kafka.clients.consumer.ConsumerRecords;
16 import org.apache.kafka.clients.producer.Producer;
17 import org.apache.kafka.clients.producer.ProducerRecord;
18 import org.apache.kafka.common.TopicPartition;
19 import org.apache.kafka.common.errors.WakeupException;
20 import reactor.core.publisher.Flux;
21 import reactor.core.publisher.Mono;
25 import java.util.stream.IntStream;
29 public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
31 private final String topic;
32 private final Producer<String, AbstractMessageTo> producer;
33 private final Consumer<String, AbstractMessageTo> consumer;
34 private final ZoneId zoneId;
35 private final int numShards;
36 private final int bufferSize;
37 private final Clock clock;
38 private final boolean[] isShardOwned;
39 private final long[] currentOffset;
40 private final long[] nextOffset;
41 private final Map<UUID, ChatRoom>[] chatrooms;
43 private boolean running;
45 private volatile boolean loadInProgress;
48 public ChatRoomChannel(
50 Producer<String, AbstractMessageTo> producer,
51 Consumer<String, AbstractMessageTo> consumer,
58 "Creating ChatRoomChannel for topic {} with {} partitions",
62 this.consumer = consumer;
63 this.producer = producer;
65 this.numShards = numShards;
66 this.bufferSize = bufferSize;
68 this.isShardOwned = new boolean[numShards];
69 this.currentOffset = new long[numShards];
70 this.nextOffset = new long[numShards];
71 this.chatrooms = new Map[numShards];
74 .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
79 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
83 CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
84 return Mono.create(sink ->
86 ProducerRecord<String, AbstractMessageTo> record =
89 chatRoomId.toString(),
90 createChatRoomRequestTo);
92 producer.send(record, ((metadata, exception) ->
96 log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
97 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
98 createChatRoom(chatRoomInfo);
99 sink.success(chatRoomInfo);
105 "Could not send create-request for chat room (id={}, name={}): {}",
109 sink.error(exception);
115 Mono<Message> sendChatMessage(
117 Message.MessageKey key,
118 LocalDateTime timestamp,
121 ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
122 return Mono.create(sink ->
124 ProducerRecord<String, AbstractMessageTo> record =
125 new ProducerRecord<>(
129 chatRoomId.toString(),
130 EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
132 producer.send(record, ((metadata, exception) ->
134 if (metadata != null)
136 // On successful send
137 Message message = new Message(key, metadata.offset(), timestamp, text);
138 log.info("Successfully send message {}", message);
139 sink.success(message);
145 "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
151 sink.error(exception);
158 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
160 log.info("Newly assigned partitions! Pausing normal operations...");
161 loadInProgress = true;
163 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
165 int partition = topicPartition.partition();
166 isShardOwned[partition] = true;
167 this.currentOffset[partition] = currentOffset;
170 "Partition assigned: {} - loading messages: next={} -> current={}",
172 nextOffset[partition],
175 consumer.seek(topicPartition, nextOffset[partition]);
178 consumer.resume(partitions);
182 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
184 partitions.forEach(topicPartition ->
186 int partition = topicPartition.partition();
187 isShardOwned[partition] = false;
188 log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
193 public void onPartitionsLost(Collection<TopicPartition> partitions)
195 log.warn("Lost partitions: {}, partitions");
196 // TODO: Muss auf den Verlust anders reagiert werden?
197 onPartitionsRevoked(partitions);
209 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
210 log.info("Fetched {} messages", records.count());
214 loadChatRoom(records);
216 if (isLoadingCompleted())
218 log.info("Loading of messages completed! Pausing all owned partitions...");
219 pauseAllOwnedPartions();
220 log.info("Resuming normal operations...");
221 loadInProgress = false;
226 if (!records.isEmpty())
228 throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
232 catch (WakeupException e)
234 log.info("Received WakeupException, exiting!");
239 log.info("Exiting normally");
242 private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
244 for (ConsumerRecord<String, AbstractMessageTo> record : records)
246 UUID chatRoomId = UUID.fromString(record.key());
248 switch (record.value().getType())
250 case COMMAND_CREATE_CHATROOM:
253 (CommandCreateChatRoomTo) record.value(),
257 case EVENT_CHATMESSAGE_RECEIVED:
258 Instant instant = Instant.ofEpochSecond(record.timestamp());
259 LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
264 (EventChatMessageReceivedTo) record.value(),
270 "Ignoring message for chat-room {} with offset {}: {}",
276 nextOffset[record.partition()] = record.offset() + 1;
280 private void createChatRoom(
282 CommandCreateChatRoomTo createChatRoomRequestTo,
285 log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
286 KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
287 ChatRoom chatRoom = new ChatRoom(
289 createChatRoomRequestTo.getName(),
294 putChatRoom(chatRoom);
298 private void createChatRoom(ChatRoomInfo chatRoomInfo)
300 UUID id = chatRoomInfo.getId();
301 String name = chatRoomInfo.getName();
302 int shard = chatRoomInfo.getShard();
303 log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
304 KafkaChatRoomService service = new KafkaChatRoomService(this, id);
305 ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
306 putChatRoom(chatRoom);
309 private void loadChatMessage(
311 LocalDateTime timestamp,
313 EventChatMessageReceivedTo chatMessageTo,
316 Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
317 Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
319 ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
320 KafkaChatRoomService kafkaChatRoomService =
321 (KafkaChatRoomService) chatRoom.getChatRoomService();
323 kafkaChatRoomService.persistMessage(message);
326 private boolean isLoadingCompleted()
330 .filter(shard -> isShardOwned[shard])
331 .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
334 private void pauseAllOwnedPartions()
336 consumer.pause(IntStream
338 .filter(shard -> isShardOwned[shard])
339 .mapToObj(shard -> new TopicPartition(topic, shard))
344 private void putChatRoom(ChatRoom chatRoom)
346 Integer partition = chatRoom.getShard();
347 UUID chatRoomId = chatRoom.getId();
348 if (chatrooms[partition].containsKey(chatRoomId))
350 log.warn("Ignoring existing chat-room: " + chatRoom);
355 "Adding new chat-room to partition {}: {}",
359 chatrooms[partition].put(chatRoomId, chatRoom);
363 int[] getOwnedShards()
367 .filter(shard -> isShardOwned[shard])
371 Mono<ChatRoom> getChatRoom(int shard, UUID id)
375 throw new LoadInProgressException(shard);
378 if (!isShardOwned[shard])
380 throw new ShardNotOwnedException(shard);
383 return Mono.justOrEmpty(chatrooms[shard].get(id));
386 Flux<ChatRoom> getChatRooms()
389 .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
390 .filter(shard -> isShardOwned[shard])
391 .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));