package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.Getter;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
catch (WakeupException e)
{
log.info("Received WakeupException, exiting!");
catch (WakeupException e)
{
log.info("Received WakeupException, exiting!");
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
- .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
+ if (!loadInProgress)
+ {
+ log.info("Loading of info completed! Resuming normal operations...");
+ channelState = ChannelState.READY;
+ }