From: Kai Moritz Date: Mon, 26 Feb 2024 18:55:19 +0000 (+0100) Subject: fix: Errors during shard-publishing should not kill the instance X-Git-Tag: rebase--2024-02-27--11-49~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=13f86063f851fc2c4ad6de56c8edb78bff9d0592;p=demos%2Fkafka%2Fchat fix: Errors during shard-publishing should not kill the instance * `HaproxyShardingPublisherStrategy` has to transform any exception into a `Mono.error()`. * `DataChannel.onPartitionsAssigned(..)` has to log and swallow errors during the propagation of the shard-ownership. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java index 3caaeb38..ad71d497 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -5,7 +5,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; -import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; @@ -33,7 +32,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate socketChannel.close(); return Mono.just(instanceId); } - catch (IOException e) + catch (Exception e) { return Mono.error(e); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index fdb16fbd..b4cc33f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -155,7 +155,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener "Could not publish instance {} as owner of shard {}: {}", instanceId, partition, - throwable)) + throwable.toString())) + .onErrorComplete() .block(); });