From: Kai Moritz Date: Mon, 5 Feb 2024 19:19:44 +0000 (+0100) Subject: WIP X-Git-Tag: rebase--2024-02-18--17-42~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f4b1a26fc24afca704965d4f88128d60ffae24da;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java index a7fd9d2e..d77f0d8a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java @@ -9,6 +9,7 @@ import reactor.core.publisher.Flux; import java.util.UUID; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -18,8 +19,17 @@ public interface StorageStrategy { Logger log = LoggerFactory.getLogger(StorageStrategy.class.getCanonicalName()); + AtomicBoolean running = new AtomicBoolean(true); + + default void write(ChatHomeService chatHomeService) { + if (!running.getAndSet(false)) + { + log.info("{} is not running, skip write...", chatHomeService); + return; + } + Phaser writtenChatRooms = new Phaser(1); AtomicInteger numErrors = new AtomicInteger(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java index 69377369..c7ede36b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java @@ -37,7 +37,7 @@ class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT @DynamicPropertySource static void addMongoPortProperty(DynamicPropertyRegistry registry) { - registry.add("spring.data.mongodb.port", () -> CONTAINER.getMappedPort(27017)); + registry.add("spring.data.mongodb.port", () -> CONTAINER.getMappedPort(MONGODB_PORT)); } @BeforeEach