]> juplo.de Git - demos/kafka/chat/commitdiff
feat: The publishing of the owernship is retried, if it fails
authorKai Moritz <kai@juplo.de>
Sat, 23 Mar 2024 12:24:43 +0000 (13:24 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 24 Mar 2024 19:28:56 +0000 (20:28 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 9be75533e2d532cc04ef3ad0a7d4bb14ba726694..0ead9ef96071e7345a7477202909ca934a778b7a 100644 (file)
@@ -16,6 +16,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 import java.time.*;
 import java.util.Collection;
@@ -154,6 +155,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
       channelMediator.shardAssigned(partition);
       shardingPublisherStrategy
           .publishOwnership(partition)
+          .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
           .doOnError(throwable -> log.error(
               "Could not publish instance {} as owner of shard {}: {}",
               instanceId,