From: Kai Moritz Date: Mon, 4 Mar 2024 13:45:22 +0000 (+0100) Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN X-Git-Tag: rebase--2024-03-05--09-07~13 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a13c29ed1e13945f77c1542018bcfbac2e2d79a1;p=demos%2Fkafka%2Fchat WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java deleted file mode 100644 index 7d8f3d55..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.chat.backend.domain.exceptions; - - -import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; - - -public class ChannelNotReadyException extends IllegalStateException -{ - public final DataChannel.State state; - - public ChannelNotReadyException(DataChannel.State state) - { - super("Not ready! Current state: " + state); - this.state = state; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java index 808187be..0746748a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java @@ -3,8 +3,11 @@ package de.juplo.kafka.chat.backend.implementation.kafka; public class ChannelNotReadyException extends IllegalStateException { - public ChannelNotReadyException() + public final ChannelState state; + + public ChannelNotReadyException(ChannelState state) { - super("Load in progress..."); + super("Not ready! Current state: " + state); + this.state = state; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java new file mode 100644 index 00000000..554b4d66 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +public enum ChannelState +{ + STARTING, + LOAD_IN_PROGRESS, + READY, + SHUTTING_DOWN +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 4daaef10..5a5a6838 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -1,6 +1,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; @@ -14,7 +17,10 @@ import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @@ -39,7 +45,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private boolean running; @Getter - private volatile State state = State.STARTING; + private volatile ChannelState channelState = ChannelState.STARTING; public DataChannel( @@ -128,7 +134,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { log.info("Newly assigned partitions! Pausing normal operations..."); - state = State.LOAD_IN_PROGRESS; + channelState = ChannelState.LOAD_IN_PROGRESS; consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { @@ -195,7 +201,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); - switch (state) + switch (channelState) { case LOAD_IN_PROGRESS -> { @@ -206,7 +212,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener log.info("Loading of messages completed! Pausing all owned partitions..."); pauseAllOwnedPartions(); log.info("Resuming normal operations..."); - state = State.READY; + channelState = ChannelState.READY; } } case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count()); @@ -214,7 +220,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { if (!records.isEmpty()) { - throw new IllegalStateException("All owned partitions should be paused, when in state " + state); + throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState); } } } @@ -320,8 +326,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Mono getChatRoomData(int shard, UUID id) { - State capturedState = state; - if (capturedState != State.READY) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { return Mono.error(new ChannelNotReadyException(capturedState)); } @@ -359,12 +365,4 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { return consumer.groupMetadata(); } - - public enum State - { - STARTING, - LOAD_IN_PROGRESS, - READY, - SHUTTING_DOWN - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 80938fcd..13556dab 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -17,7 +17,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.URI; -import java.time.*; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -41,7 +41,7 @@ public class InfoChannel implements Runnable private boolean running; @Getter - private volatile boolean loadInProgress = true; + private volatile ChannelState channelState = ChannelState.STARTING; public InfoChannel( @@ -191,7 +191,7 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; while (running) { @@ -218,10 +218,15 @@ public class InfoChannel implements Runnable private void updateNextOffset(int partition, long nextOffset) { this.nextOffset[partition] = nextOffset; - if (loadInProgress) { - loadInProgress = IntStream + if (channelState == ChannelState.LOAD_IN_PROGRESS) + { + boolean loadInProgress = IntStream .range(0, numShards) .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]); + if (!loadInProgress) + { + channelState = ChannelState.READY; + } } } @@ -295,9 +300,10 @@ public class InfoChannel implements Runnable Mono getChatRoomInfo(UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new ChannelNotReadyException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } return Mono.fromSupplier(() -> chatRoomInfo.get(id));