From: Kai Moritz Date: Mon, 4 Mar 2024 13:37:23 +0000 (+0100) Subject: WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN X-Git-Tag: rebase--2024-03-05--09-07~14 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=546fde9303d6b36fedc18bfe6a0e59065c2afe4a;p=demos%2Fkafka%2Fchat WIP:refactor: Refined channel-states, introduced `ChannelState` -- ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java new file mode 100644 index 00000000..7d8f3d55 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/exceptions/ChannelNotReadyException.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.chat.backend.domain.exceptions; + + +import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel; + + +public class ChannelNotReadyException extends IllegalStateException +{ + public final DataChannel.State state; + + public ChannelNotReadyException(DataChannel.State state) + { + super("Not ready! Current state: " + state); + this.state = state; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 3af0a570..4daaef10 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -39,7 +39,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private boolean running; @Getter - private volatile boolean loadInProgress; + private volatile State state = State.STARTING; public DataChannel( @@ -128,7 +128,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; + state = State.LOAD_IN_PROGRESS; consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { @@ -195,23 +195,27 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); - if (loadInProgress) + switch (state) { - loadChatRoomData(records); - - if (isLoadingCompleted()) + case LOAD_IN_PROGRESS -> { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; + loadChatRoomData(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + state = State.READY; + } } - } - else - { - if (!records.isEmpty()) + case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count()); + default -> { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when in state " + state); + } } } } @@ -316,9 +320,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Mono getChatRoomData(int shard, UUID id) { - if (loadInProgress) + State capturedState = state; + if (capturedState != State.READY) { - return Mono.error(new ChannelNotReadyException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } if (!isShardOwned[shard]) @@ -354,4 +359,12 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { return consumer.groupMetadata(); } + + public enum State + { + STARTING, + LOAD_IN_PROGRESS, + READY, + SHUTTING_DOWN + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java index bac07fff..b3351559 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractHandoverIT.java @@ -69,8 +69,24 @@ public abstract class AbstractHandoverIT log.info("Sleeping for 3 seconds..."); Thread.sleep(3000); + log.info("Starting Backend 2..."); + containers.backend2.start(); + log.info("Backend 2 started"); + + log.info("Sleeping for 3 seconds..."); + Thread.sleep(3000); + + log.info("Starting Backend 3..."); + containers.backend3.start(); + log.info("Backend 3 started"); + + log.info("Sleeping for 3 seconds..."); + Thread.sleep(3000); + + log.info("Shutting down writers..."); for (int i = 0; i < NUM_CLIENTS; i++) { + log.info("Shutting down writer {}", i); testWriters[i].running = false; testWriterFutures[i].join(); log.info("Joined TestWriter {}", testWriters[i].user); diff --git a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java index d4cc7195..c21acda1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java +++ b/src/test/java/de/juplo/kafka/chat/backend/TestWriter.java @@ -45,7 +45,7 @@ public class TestWriter .delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500))) .map(i -> "Message #" + i) .flatMap(message -> sendMessage(chatRoom, message) - .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1)))) + .retryWhen(Retry.backoff(10, Duration.ofSeconds(1)))) .doOnNext(message -> { sentMessages.add(message);