From 13f86063f851fc2c4ad6de56c8edb78bff9d0592 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 26 Feb 2024 19:55:19 +0100 Subject: [PATCH] fix: Errors during shard-publishing should not kill the instance * `HaproxyShardingPublisherStrategy` has to transform any exception into a `Mono.error()`. * `DataChannel.onPartitionsAssigned(..)` has to log and swallow errors during the propagation of the shard-ownership. --- .../haproxy/HaproxyShardingPublisherStrategy.java | 3 +-- .../kafka/chat/backend/implementation/kafka/DataChannel.java | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java index 3caaeb38..ad71d497 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -5,7 +5,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; -import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; @@ -33,7 +32,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate socketChannel.close(); return Mono.just(instanceId); } - catch (IOException e) + catch (Exception e) { return Mono.error(e); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index fdb16fbd..b4cc33f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -155,7 +155,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener "Could not publish instance {} as owner of shard {}: {}", instanceId, partition, - throwable)) + throwable.toString())) + .onErrorComplete() .block(); }); -- 2.20.1