X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=95f7fb064b1271459102f79cd9f84dd9a245b61e;hb=8300dcd98f681893a077051560151a8f1b94e38d;hp=a6351d0ff52354241c57bf63b3a68c9956682d0a;hpb=e73b46955f6b7aff3aa1ecb79b2aee75fa76b097;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index a6351d0f..95f7fb06 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,33 +1,33 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.URI; -import java.time.*; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceUri" }) @Slf4j -public class InfoChannel implements Runnable +public class InfoChannel implements Channel { private final String topic; private final Producer producer; @@ -39,10 +39,11 @@ public class InfoChannel implements Runnable private final long[] nextOffset; private final Map chatRoomInfo; private final String instanceUri; + private final ChannelMediator channelMediator; private boolean running; @Getter - private volatile boolean loadInProgress = true; + private volatile ChannelState channelState = ChannelState.STARTING; public InfoChannel( @@ -51,7 +52,8 @@ public class InfoChannel implements Runnable Consumer infoChannelConsumer, Duration pollingInterval, int numShards, - URI instanceUri) + URI instanceUri, + ChannelMediator channelMediator) { log.debug( "Creating InfoChannel for topic {}", @@ -72,6 +74,8 @@ public class InfoChannel implements Runnable .forEach(partition -> this.nextOffset[partition] = -1l); this.instanceUri = instanceUri.toASCIIString(); + + this.channelMediator = channelMediator; } @@ -189,7 +193,7 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; while (running) { @@ -206,6 +210,7 @@ public class InfoChannel implements Runnable catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -216,10 +221,16 @@ public class InfoChannel implements Runnable private void updateNextOffset(int partition, long nextOffset) { this.nextOffset[partition] = nextOffset; - if (loadInProgress) { - loadInProgress = IntStream + if (channelState == ChannelState.LOAD_IN_PROGRESS) + { + boolean loadInProgress = IntStream .range(0, numShards) - .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]); + .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]); + if (!loadInProgress) + { + log.info("Loading of info completed! Resuming normal operations..."); + channelState = ChannelState.READY; + } } } @@ -282,6 +293,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.channelMediator.chatRoomCreated(chatRoomInfo); } } @@ -292,9 +304,10 @@ public class InfoChannel implements Runnable Mono getChatRoomInfo(UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } return Mono.fromSupplier(() -> chatRoomInfo.get(id));