WAS:TMP:IS?FIX:WIP:test: `*ConfigurationIT` asserts, if restored messages can be...
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / DataChannel.java
index 63f36f5..8dcc1bc 100644 (file)
@@ -176,6 +176,9 @@ public class DataChannel implements Channel, ConsumerRebalanceListener
     partitions.forEach(topicPartition ->
     {
       int partition = topicPartition.partition();
+      chatRoomData[partition]
+          .values()
+          .forEach(chatRoomData -> chatRoomData.close());
       isShardOwned[partition] = false;
       nextOffset[partition] = consumer.position(topicPartition);
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);