package de.juplo.kafka.chat.backend.implementation.kafka;
-import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.IntStream;
private boolean running;
@Getter
- private volatile State state = State.STARTING;
+ private volatile ChannelState channelState = ChannelState.STARTING;
public DataChannel(
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
log.info("Newly assigned partitions! Pausing normal operations...");
- state = State.LOAD_IN_PROGRESS;
+ channelState = ChannelState.LOAD_IN_PROGRESS;
consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.info("Fetched {} messages", records.count());
- switch (state)
+ switch (channelState)
{
case LOAD_IN_PROGRESS ->
{
log.info("Loading of messages completed! Pausing all owned partitions...");
pauseAllOwnedPartions();
log.info("Resuming normal operations...");
- state = State.READY;
+ channelState = ChannelState.READY;
}
}
case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
{
if (!records.isEmpty())
{
- throw new IllegalStateException("All owned partitions should be paused, when in state " + state);
+ throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState);
}
}
}
Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
{
- State capturedState = state;
- if (capturedState != State.READY)
+ ChannelState capturedState = channelState;
+ if (capturedState != ChannelState.READY)
{
return Mono.error(new ChannelNotReadyException(capturedState));
}
{
return consumer.groupMetadata();
}
-
- public enum State
- {
- STARTING,
- LOAD_IN_PROGRESS,
- READY,
- SHUTTING_DOWN
- }
}
import reactor.core.publisher.Mono;
import java.net.URI;
-import java.time.*;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
private boolean running;
@Getter
- private volatile boolean loadInProgress = true;
+ private volatile ChannelState channelState = ChannelState.STARTING;
public InfoChannel(
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = 0l);
- loadInProgress = true;
+ channelState = ChannelState.LOAD_IN_PROGRESS;
while (running)
{
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
- if (loadInProgress) {
- loadInProgress = IntStream
+ if (channelState == ChannelState.LOAD_IN_PROGRESS)
+ {
+ boolean loadInProgress = IntStream
.range(0, numShards)
.anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ if (!loadInProgress)
+ {
+ channelState = ChannelState.READY;
+ }
}
}
Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- if (loadInProgress)
+ ChannelState capturedState = channelState;
+ if (capturedState != ChannelState.READY)
{
- return Mono.error(new ChannelNotReadyException());
+ return Mono.error(new ChannelNotReadyException(capturedState));
}
return Mono.fromSupplier(() -> chatRoomInfo.get(id));