private boolean running;
@Getter
- private volatile boolean loadInProgress;
+ private volatile State state = State.STARTING;
public DataChannel(
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
log.info("Newly assigned partitions! Pausing normal operations...");
- loadInProgress = true;
+ state = State.LOAD_IN_PROGRESS;
consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.info("Fetched {} messages", records.count());
- if (loadInProgress)
+ switch (state)
{
- loadChatRoomData(records);
-
- if (isLoadingCompleted())
+ case LOAD_IN_PROGRESS ->
{
- log.info("Loading of messages completed! Pausing all owned partitions...");
- pauseAllOwnedPartions();
- log.info("Resuming normal operations...");
- loadInProgress = false;
+ loadChatRoomData(records);
+
+ if (isLoadingCompleted())
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ pauseAllOwnedPartions();
+ log.info("Resuming normal operations...");
+ state = State.READY;
+ }
}
- }
- else
- {
- if (!records.isEmpty())
+ case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
+ default ->
{
- throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+ if (!records.isEmpty())
+ {
+ throw new IllegalStateException("All owned partitions should be paused, when in state " + state);
+ }
}
}
}
Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
{
- if (loadInProgress)
+ State capturedState = state;
+ if (capturedState != State.READY)
{
- return Mono.error(new ChannelNotReadyException());
+ return Mono.error(new ChannelNotReadyException(capturedState));
}
if (!isShardOwned[shard])
{
return consumer.groupMetadata();
}
+
+ public enum State
+ {
+ STARTING,
+ LOAD_IN_PROGRESS,
+ READY,
+ SHUTTING_DOWN
+ }
}
log.info("Sleeping for 3 seconds...");
Thread.sleep(3000);
+ log.info("Starting Backend 2...");
+ containers.backend2.start();
+ log.info("Backend 2 started");
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ log.info("Starting Backend 3...");
+ containers.backend3.start();
+ log.info("Backend 3 started");
+
+ log.info("Sleeping for 3 seconds...");
+ Thread.sleep(3000);
+
+ log.info("Shutting down writers...");
for (int i = 0; i < NUM_CLIENTS; i++)
{
+ log.info("Shutting down writer {}", i);
testWriters[i].running = false;
testWriterFutures[i].join();
log.info("Joined TestWriter {}", testWriters[i].user);
.delayElements(Duration.ofMillis(ThreadLocalRandom.current().nextLong(500, 1500)))
.map(i -> "Message #" + i)
.flatMap(message -> sendMessage(chatRoom, message)
- .retryWhen(Retry.fixedDelay(10, Duration.ofSeconds(1))))
+ .retryWhen(Retry.backoff(10, Duration.ofSeconds(1))))
.doOnNext(message ->
{
sentMessages.add(message);