From 58522d1ddff4795d9c924d90cd1695d6c8d30a38 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Sep 2023 18:36:05 +0200 Subject: [PATCH] refactor: Separated channels for data and info -- Moved/copied classes * Split `ChatRoomChannel` into `InfoChannel` and `DataChannel` ** `DataChannel` manages only data for chat-messages ** `InfoChannel` manages all info-data (at the moment only `EventChatRoomCreated`) * Aligned test-setup for kafka-related tests --- ...{ChatRoomChannel.java => DataChannel.java} | 0 .../implementation/kafka/InfoChannel.java | 428 ++++++++++++++++++ .../messages/CommandCreateChatRoomTo.java | 27 -- .../EventChatMessageReceivedTo.java | 0 .../messages/info/EventChatRoomCreated.java | 2 + .../messages/CommandCreateChatRoomToTest.java | 35 -- .../EventChatMessageReceivedToTest.java | 0 7 files changed, 430 insertions(+), 62 deletions(-) rename src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/{ChatRoomChannel.java => DataChannel.java} (100%) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java rename src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/{ => data}/EventChatMessageReceivedTo.java (100%) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java delete mode 100644 src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java rename src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/{ => data}/EventChatMessageReceivedToTest.java (100%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java rename to src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java new file mode 100644 index 00000000..d94bc659 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -0,0 +1,428 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.*; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Producer producer; + private final Consumer consumer; + private final ZoneId zoneId; + private final int numShards; + private final int bufferSize; + private final Clock clock; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatRoomInfo; + private final Map[] chatRoomData; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatRoomChannel( + String topic, + Producer producer, + Consumer consumer, + ZoneId zoneId, + int numShards, + int bufferSize, + Clock clock) + { + log.debug( + "Creating ChatRoomChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatRoomInfo = new Map[numShards]; + this.chatRoomData = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> + { + this.chatRoomInfo[shard] = new HashMap<>(); + this.chatRoomData[shard] = new HashMap<>(); + }); + } + + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + null, + zdt.toEpochSecond(), + chatRoomId.toString(), + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadChatRoom(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + log.info("Received WakeupException, exiting!"); + running = false; + } + } + + log.info("Exiting normally"); + } + + private void loadChatRoom(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + UUID chatRoomId = UUID.fromString(record.key()); + + switch (record.value().getType()) + { + case COMMAND_CREATE_CHATROOM: + createChatRoom( + chatRoomId, + (CommandCreateChatRoomTo) record.value(), + record.partition()); + break; + + case EVENT_CHATMESSAGE_RECEIVED: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (EventChatMessageReceivedTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); + } + + nextOffset[record.partition()] = record.offset() + 1; + } + } + + private void createChatRoom( + UUID chatRoomId, + CommandCreateChatRoomTo createChatRoomRequestTo, + Integer partition) + { + log.info( + "Loading ChatRoom {} for shard {} with buffer-size {}", + chatRoomId, + partition, + bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + ChatRoomData chatRoomData = new ChatRoomData( + clock, + service, + bufferSize); + putChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + chatRoomData); + } + + + private void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, id); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + putChatRoom( + chatRoomInfo.getId(), + chatRoomInfo.getName(), + chatRoomInfo.getShard(), + chatRoomData); + } + + private void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + EventChatMessageReceivedTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); + KafkaChatMessageService kafkaChatRoomService = + (KafkaChatMessageService) chatRoomData.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + + private boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + } + + private void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + + private void putChatRoom( + UUID chatRoomId, + String name, + Integer partition, + ChatRoomData chatRoomData) + { + if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + { + log.warn( + "Ignoring existing chat-room for {}: {}", + partition, + chatRoomId); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoomData); + + this.chatRoomInfo[partition].put( + chatRoomId, + new ChatRoomInfo(chatRoomId, name, partition)); + this.chatRoomData[partition].put(chatRoomId, chatRoomData); + } + } + + int[] getOwnedShards() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .toArray(); + } + + Mono getChatRoomData(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + Flux getChatRoomInfo() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + } + + Mono getChatRoomInfo(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java deleted file mode 100644 index 29ba77c6..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; - -import lombok.*; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class CommandCreateChatRoomTo extends AbstractMessageTo -{ - private String name; - - - public CommandCreateChatRoomTo() - { - super(ToType.COMMAND_CREATE_CHATROOM); - } - - - public static CommandCreateChatRoomTo of(String name) - { - CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); - to.name = name; - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java rename to src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java new file mode 100644 index 00000000..0cf42320 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -0,0 +1,2 @@ +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated { +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java deleted file mode 100644 index b0311126..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.juplo.kafka.chat.backend.implementation.kafka.messages; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class CommandCreateChatRoomToTest -{ - final String json = """ - { - "name": "Foo-Room!" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); - assertThat(message.getName()).isEqualTo("Foo-Room!"); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java similarity index 100% rename from src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java rename to src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java -- 2.20.1