X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=abe51f4a61aa5c737ab7d8ba793115e1cd29745e;hb=89b5eb1eaf7890c830848dbc4281add2bff41131;hp=e1754a157f978c33f5a617e16e9ea688f60fd5d6;hpb=0f13dc5e88722ca7c238258747041d9663251356;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index e1754a15..abe51f4a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -1,11 +1,14 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; @@ -15,12 +18,16 @@ import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceId" }) @Slf4j -public class DataChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Channel, ConsumerRebalanceListener { private final String instanceId; private final String topic; @@ -29,7 +36,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final ZoneId zoneId; private final int numShards; private final Duration pollingInterval; - private final int bufferSize; + private final int historyLimit; private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; @@ -40,7 +47,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private boolean running; @Getter - private volatile boolean loadInProgress; + private volatile ChannelState channelState = ChannelState.STARTING; public DataChannel( @@ -51,7 +58,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ZoneId zoneId, int numShards, Duration pollingInterval, - int bufferSize, + int historyLimit, Clock clock, ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) @@ -68,7 +75,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.zoneId = zoneId; this.numShards = numShards; this.pollingInterval = pollingInterval; - this.bufferSize = bufferSize; + this.historyLimit = historyLimit; this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; @@ -129,7 +136,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { @@ -171,7 +178,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; nextOffset[partition] = consumer.position(topicPartition); + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.deactivate()); + channelMediator.shardRevoked(partition); }); } @@ -196,29 +209,35 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); - if (loadInProgress) + switch (channelState) { - loadChatRoomData(records); - - if (isLoadingCompleted()) + case LOAD_IN_PROGRESS -> { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; + loadChatRoomData(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + activateAllOwnedChatRooms(); + log.info("Resuming normal operations..."); + channelState = ChannelState.READY; + } } - } - else - { - if (!records.isEmpty()) + case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count()); + default -> { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState); + } } } } catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -301,6 +320,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .toList()); } + private void activateAllOwnedChatRooms() + { + IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .forEach(shard -> chatRoomData[shard] + .values() + .forEach(chatRoomData -> chatRoomData.activate())); + } + int[] getOwnedShards() { @@ -312,14 +341,25 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener void createChatRoomData(ChatRoomInfo chatRoomInfo) { - computeChatRoomData(chatRoomInfo.getId(), chatRoomInfo.getShard()); + int shard = chatRoomInfo.getShard(); + + ChatRoomData chatRoomData = computeChatRoomData( + chatRoomInfo.getId(), + chatRoomInfo.getShard()); + + // TODO: Possible race-condition in case of an ongoing rebalance! + if (isShardOwned[shard]) + { + chatRoomData.activate(); + } } Mono getChatRoomData(int shard, UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } if (!isShardOwned[shard]) @@ -342,9 +382,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } else { - log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize); + log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit); KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - chatRoomData = new ChatRoomData(clock, service, bufferSize); + chatRoomData = new ChatRoomData(clock, service, historyLimit); this.chatRoomData[shard].put(chatRoomId, chatRoomData); }