package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
-import java.time.*;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.IntStream;
+@ToString(of = { "topic", "instanceUri" })
@Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
+ private final Duration pollingInterval;
private final int numShards;
private final String[] shardOwners;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final String instanceUri;
+ private final ChannelMediator channelMediator;
private boolean running;
@Getter
- private volatile boolean loadInProgress = true;
+ private volatile ChannelState channelState = ChannelState.STARTING;
public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ Duration pollingInterval,
int numShards,
- URI instanceUri)
+ URI instanceUri,
+ ChannelMediator channelMediator)
{
log.debug(
"Creating InfoChannel for topic {}",
this.producer = producer;
this.chatRoomInfo = new HashMap<>();
+ this.pollingInterval = pollingInterval;
+
this.numShards = numShards;
this.shardOwners = new String[numShards];
this.currentOffset = new long[numShards];
.forEach(partition -> this.nextOffset[partition] = -1l);
this.instanceUri = instanceUri.toASCIIString();
+
+ this.channelMediator = channelMediator;
}
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = 0l);
- loadInProgress = true;
+ channelState = ChannelState.LOAD_IN_PROGRESS;
while (running)
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.debug("Fetched {} messages", records.count());
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
catch (WakeupException e)
{
log.info("Received WakeupException, exiting!");
+ channelState = ChannelState.SHUTTING_DOWN;
running = false;
}
}
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
- if (loadInProgress) {
- loadInProgress = IntStream
+ if (channelState == ChannelState.LOAD_IN_PROGRESS)
+ {
+ boolean loadInProgress = IntStream
.range(0, numShards)
.anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ if (!loadInProgress)
+ {
+ channelState = ChannelState.READY;
+ }
}
}
chatRoomId);
this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+ this.channelMediator.chatRoomCreated(chatRoomInfo);
}
}
Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- if (loadInProgress)
+ ChannelState capturedState = channelState;
+ if (capturedState != ChannelState.READY)
{
- return Mono.error(new LoadInProgressException());
+ return Mono.error(new ChannelNotReadyException(capturedState));
}
return Mono.fromSupplier(() -> chatRoomInfo.get(id));