fix: Errors during shard-publishing should not kill the instance
authorKai Moritz <kai@juplo.de>
Mon, 26 Feb 2024 18:55:19 +0000 (19:55 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 26 Feb 2024 19:41:55 +0000 (20:41 +0100)
* `HaproxyShardingPublisherStrategy` has to transform any exception into
  a `Mono.error()`.
* `DataChannel.onPartitionsAssigned(..)` has to log and swallow errors
  during the propagation of the shard-ownership.

src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java

index 3caaeb3..ad71d49 100644 (file)
@@ -5,7 +5,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Mono;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
@@ -33,7 +32,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate
       socketChannel.close();
       return Mono.just(instanceId);
     }
-    catch (IOException e)
+    catch (Exception e)
     {
       return Mono.error(e);
     }
index fdb16fb..b4cc33f 100644 (file)
@@ -155,7 +155,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
               "Could not publish instance {} as owner of shard {}: {}",
               instanceId,
               partition,
-              throwable))
+              throwable.toString()))
+          .onErrorComplete()
           .block();
     });