package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
private final String instanceUri;
private boolean running;
+ @Getter
+ private volatile boolean loadInProgress = true;
public InfoChannel(
}
- boolean isLoadInProgress()
- {
- return IntStream
- .range(0, numShards)
- .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
- }
-
Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
UUID chatRoomId,
String name,
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = 0l);
+ loadInProgress = true;
while (running)
{
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
+ if (loadInProgress) {
+ loadInProgress = IntStream
+ .range(0, numShards)
+ .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ }
}
private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
return Mono.fromSupplier(() -> chatRoomInfo.get(id));
}