X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=e6c18c779e9238890832dd4632bcf717a0e84401;hb=56cb19c83265e6007668e2747221ac47b1388732;hp=3fe15c44a61410f17a8b2723c92e77fb2e104cce;hpb=284e5c04e30bcfecfe2bca1252d247dbc6baaecd;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 3fe15c44..e6c18c77 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -38,6 +39,8 @@ public class InfoChannel implements Runnable private final String instanceUri; private boolean running; + @Getter + private volatile boolean loadInProgress = true; public InfoChannel( @@ -68,13 +71,6 @@ public class InfoChannel implements Runnable } - boolean isLoadInProgress() - { - return IntStream - .range(0, numShards) - .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); - } - Mono sendChatRoomCreatedEvent( UUID chatRoomId, String name, @@ -189,6 +185,7 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); + loadInProgress = true; while (running) { @@ -215,6 +212,11 @@ public class InfoChannel implements Runnable private void updateNextOffset(int partition, long nextOffset) { this.nextOffset[partition] = nextOffset; + if (loadInProgress) { + loadInProgress = IntStream + .range(0, numShards) + .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]); + } } private void handleMessage(ConsumerRecord record)