From: Kai Moritz Date: Sat, 23 Mar 2024 12:24:43 +0000 (+0100) Subject: feat: The publishing of the owernship is retried, if it fails X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=979f12d0e32ec39f1f5a9dea5f09f64a3c6654d5;p=demos%2Fkafka%2Fchat feat: The publishing of the owernship is retried, if it fails --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 9be75533..0ead9ef9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -16,6 +16,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import java.time.*; import java.util.Collection; @@ -154,6 +155,7 @@ public class DataChannel implements Channel, ConsumerRebalanceListener channelMediator.shardAssigned(partition); shardingPublisherStrategy .publishOwnership(partition) + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) .doOnError(throwable -> log.error( "Could not publish instance {} as owner of shard {}: {}", instanceId,