From 2ab23f9540afe042c2f6b30c0ab9ea988acacc82 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 22 Feb 2024 14:39:30 +0100 Subject: [PATCH 01/16] test: Simplified & Unified the Kafka-tests --- .../chat/backend/domain/ChatHomeServiceTest.java | 2 ++ .../implementation/kafka/KafkaTestUtils.java | 15 +-------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java index 004c35f7..858d03d3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; +import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration; import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -24,6 +25,7 @@ import static pl.rzrz.assertj.reactor.Assertions.assertThat; @SpringJUnitConfig(classes = { InMemoryServicesConfiguration.class, FilesStorageConfiguration.class, + KafkaServicesConfiguration.class, ChatHomeServiceTest.TestConfiguration.class }) @EnableConfigurationProperties(ChatBackendProperties.class) public abstract class ChatHomeServiceTest diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 52a527d3..2ede2029 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -5,24 +5,17 @@ import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import reactor.core.publisher.Mono; -import java.time.Clock; import java.util.List; @Slf4j -public class KafkaTestUtils +public abstract class KafkaTestUtils { - @TestConfiguration - @EnableConfigurationProperties(ChatBackendProperties.class) - @Import(KafkaServicesConfiguration.class) public static class KafkaTestConfiguration { @Bean @@ -44,12 +37,6 @@ public class KafkaTestUtils dataChannel.onPartitionsAssigned(assignedPartitions); }; } - - @Bean - public Clock clock() - { - return Clock.systemDefaultZone(); - } } -- 2.20.1 From 15273e71b69725fc536c3f1cbd49a4f2cf8afefa Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 21 Feb 2024 18:30:01 +0100 Subject: [PATCH 02/16] test: Added IT for `ChatRoomRepository` and `MessageRepository` --- pom.xml | 7 +- .../storage/mongodb/ChatRoomRepositoryIT.java | 68 ++++++++++ .../storage/mongodb/MessageRepositoryIT.java | 117 ++++++++++++++++++ 3 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java diff --git a/pom.xml b/pom.xml index 04fe3747..9f6db208 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,11 @@ spring-boot-starter-test test + + org.springframework.boot + spring-boot-testcontainers + test + pl.rzrz assertj-reactor @@ -72,7 +77,7 @@ org.testcontainers - testcontainers + mongodb test diff --git a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java new file mode 100644 index 00000000..5c6e10cf --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.storage.mongodb; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.data.domain.Example; + +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.NONE, + properties = { + "spring.data.mongodb.host=localhost", + "spring.data.mongodb.database=test", + "chat.backend.inmemory.storage-strategy=mongodb" }) +@Testcontainers +public class ChatRoomRepositoryIT +{ + @Container + @ServiceConnection + static MongoDBContainer MONGODB = new MongoDBContainer("mongo:6"); + + @Autowired + ChatRoomRepository repository; + + ChatRoomTo a, b, c; + + @BeforeEach + public void setUp() + { + repository.deleteAll(); + + a = repository.save(new ChatRoomTo("a", "foo")); + b = repository.save(new ChatRoomTo("b", "bar")); + c = repository.save(new ChatRoomTo("c", "bar")); + } + + @Test + public void findsAll() + { + assertThat(repository.findAll()).containsExactly(a, b, c); + } + + @Test + public void findsById() + { + assertThat(repository.findById("a")).contains(a); + assertThat(repository.findById("b")).contains(b); + assertThat(repository.findById("c")).contains(c); + assertThat(repository.findById("666")).isEmpty(); + } + + @Test + public void findsByExample() + { + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foo")))).containsExactly(a); + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "bar")))).containsExactly(b, c); + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foobar")))).isEmpty(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java new file mode 100644 index 00000000..3e8a7160 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java @@ -0,0 +1,117 @@ +package de.juplo.kafka.chat.backend.storage.mongodb; + +import de.juplo.kafka.chat.backend.domain.Message; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.data.domain.Example; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Clock; +import java.time.LocalDateTime; +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.NONE, + properties = { + "spring.data.mongodb.host=localhost", + "spring.data.mongodb.database=test", + "chat.backend.inmemory.storage-strategy=mongodb" }) +@Testcontainers +public class MessageRepositoryIT +{ + @Container + @ServiceConnection + static MongoDBContainer MONGODB = new MongoDBContainer("mongo:6"); + + @Autowired + MessageRepository repository; + + UUID foo, bar; + MessageTo foo_1, foo_2, bar_1, bar_2, bar_3; + + @BeforeEach + public void setUp() + { + repository.deleteAll(); + + foo = UUID.randomUUID(); + bar = UUID.randomUUID(); + + long serial = 0; + LocalDateTime now = LocalDateTime.now(Clock.systemDefaultZone()); + + foo_1 = repository.save(MessageTo.from( + foo, + new Message( + Message.MessageKey.of("peter", 1l), + serial++, + now.plusSeconds(serial), + "Nachricht #" + serial))); + foo_2 = repository.save(MessageTo.from( + foo, + new Message( + Message.MessageKey.of("ute", 2l), + serial++, + now.plusSeconds(serial), + "Nachricht #" + serial))); + bar_1 = repository.save(MessageTo.from( + bar, + new Message( + Message.MessageKey.of("klaus", 1l), + serial++, + now.plusSeconds(serial), + "Nachricht #" + serial))); + bar_2 = repository.save(MessageTo.from( + bar, + new Message( + Message.MessageKey.of("beate", 2l), + serial++, + now.plusSeconds(serial), + "Nachricht #" + serial))); + bar_3 = repository.save(MessageTo.from( + bar, + new Message( + Message.MessageKey.of("peter", 3l), + serial++, + now.plusSeconds(serial), + "Nachricht #" + serial))); + } + + @Test + public void findsAll() + { + assertThat(repository.findAll()).containsExactly(foo_1, foo_2, bar_1, bar_2, bar_3); + } + + @Test + public void findsByExample_chatRoomId() + { + assertThat(repository.findAll(Example.of(new MessageTo(foo.toString(), null, null, null, null, null)))).containsExactly(foo_1, foo_2); + assertThat(repository.findAll(Example.of(new MessageTo(bar.toString(), null, null, null, null, null)))).containsExactly(bar_1, bar_2, bar_3); + } + + @Test + public void findsByExample_user() + { + assertThat(repository.findAll(Example.of(new MessageTo(null, "peter", null, null, null, null)))).containsExactly(foo_1, bar_3); + assertThat(repository.findAll(Example.of(new MessageTo(null, "klaus", null, null, null, null)))).containsExactly(bar_1); + assertThat(repository.findAll(Example.of(new MessageTo(null, "ute", null, null, null, null)))).containsExactly(foo_2); + assertThat(repository.findAll(Example.of(new MessageTo(null, "beate", null, null, null, null)))).containsExactly(bar_2); + } + + @Test + public void findsByExample_id() + { + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 1l, null, null, null)))).containsExactly(foo_1, bar_1); + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 2l, null, null, null)))).containsExactly(foo_2, bar_2); + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 3l, null, null, null)))).containsExactly(bar_3); + } +} -- 2.20.1 From b4ba2e35e606a6463e9842a15c9f77b97435ecb5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 22 Feb 2024 14:49:46 +0100 Subject: [PATCH 03/16] test: Simplified `InMemoryWithMongoDbStorageIT` * Switched to `@ServiceConnection` instead of hand-coded initializer. --- .../backend/InMemoryWithMongoDbStorageIT.java | 30 ++++--------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java index 7566f521..7055af54 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.InMemoryWithMongoDbStorageIT.DataSourceInitializer; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import de.juplo.kafka.chat.backend.storage.mongodb.ChatRoomRepository; import de.juplo.kafka.chat.backend.storage.mongodb.MessageRepository; @@ -12,13 +11,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.ApplicationContextInitializer; -import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.test.context.support.TestPropertySourceUtils; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -30,7 +27,6 @@ import java.time.Clock; @ExtendWith({SpringExtension.class}) @EnableAutoConfiguration @AutoConfigureDataMongo -@ContextConfiguration(initializers = DataSourceInitializer.class) @Slf4j public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT { @@ -72,31 +68,15 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT } } - private static final int MONGODB_PORT = 27017; - @Container - private static final GenericContainer CONTAINER = - new GenericContainer("mongo:6").withExposedPorts(MONGODB_PORT); - - public static class DataSourceInitializer - implements ApplicationContextInitializer - { - @Override - public void initialize(ConfigurableApplicationContext applicationContext) - { - TestPropertySourceUtils.addInlinedPropertiesToEnvironment( - applicationContext, - "spring.data.mongodb.host=localhost", - "spring.data.mongodb.port=" + CONTAINER.getMappedPort(MONGODB_PORT), - "spring.data.mongodb.database=test"); - } - } + @ServiceConnection + private static final GenericContainer MONGODB = new MongoDBContainer("mongo:6"); @BeforeEach void setUpLogging() { Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); - CONTAINER.followOutput(logConsumer); + MONGODB.followOutput(logConsumer); chatRoomRepository.deleteAll(); messageRepository.deleteAll(); } -- 2.20.1 From 47d4a54aaa1ce13180ab6fdad0569cf2d0851563 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 22 Feb 2024 15:35:39 +0100 Subject: [PATCH 04/16] test: Simplified the integration-tests for `StorageStrategy` --- .../backend/AbstractInMemoryStorageIT.java | 30 +++------ .../backend/AbstractStorageStrategyIT.java | 34 +++++++--- .../backend/InMemoryWithFilesStorageIT.java | 45 +++---------- .../backend/InMemoryWithMongoDbStorageIT.java | 64 ++++--------------- .../inmemory/InMemoryTestUtils.java | 31 +++++++++ 5 files changed, 90 insertions(+), 114 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java index 3fea43ed..0ec0bc19 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java @@ -1,36 +1,26 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.implementation.inmemory.SimpleChatHomeService; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; +import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryTestUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; import java.time.Clock; -@RequiredArgsConstructor +@ContextConfiguration(classes = InMemoryTestUtils.class) @Slf4j public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT { - final Clock clock; + @Autowired + InMemoryTestUtils testUtils; @Override - protected StorageStrategyITConfig getConfig() + ChatHomeService getChatHome() { - return new StorageStrategyITConfig() - { - int bufferSize = 8; - - SimpleChatHomeService simpleChatHome = new SimpleChatHomeService( - getStorageStrategy(), - clock, - bufferSize); - - @Override - public ChatHomeService getChatHome() - { - return simpleChatHome; - } - }; + return testUtils.createNoneShardingChatHomeService(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 41e80ed7..9568545a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -2,33 +2,47 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; +import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration; +import de.juplo.kafka.chat.backend.storage.mongodb.MongoDbStorageConfiguration; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import java.time.Clock; import java.util.List; import java.util.UUID; import static pl.rzrz.assertj.reactor.Assertions.*; +@SpringJUnitConfig(classes = { + InMemoryServicesConfiguration.class, + FilesStorageConfiguration.class, + MongoDbStorageConfiguration.class, + AbstractStorageStrategyIT.TestConfig.class }) +@EnableConfigurationProperties(ChatBackendProperties.class) @Slf4j public abstract class AbstractStorageStrategyIT { - protected ChatHomeService chathome; + ChatHomeService chathome; + @Autowired + StorageStrategy storageStrategy; - protected abstract StorageStrategy getStorageStrategy(); - protected abstract StorageStrategyITConfig getConfig(); + abstract ChatHomeService getChatHome(); protected void start() { - StorageStrategyITConfig config = getConfig(); - chathome = config.getChatHome(); + chathome = getChatHome(); } protected void stop() { - getStorageStrategy() + storageStrategy .write(chathome) .subscribe(); } @@ -115,8 +129,12 @@ public abstract class AbstractStorageStrategyIT } - interface StorageStrategyITConfig + static class TestConfig { - ChatHomeService getChatHome(); + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java index 78f46250..e4eaf3ab 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java @@ -1,54 +1,29 @@ package de.juplo.kafka.chat.backend; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.implementation.StorageStrategy; -import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.TestPropertySource; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.Clock; -import java.util.logging.Level; +@TestPropertySource(properties = { + "chat.backend.inmemory.sharding-strategy=none", + "chat.backend.inmemory.storage-strategy=files", + "chat.backend.inmemory.storage-directory=target/files" }) @Slf4j public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT { - final static Path path = Paths.get("target","files"); - - final ObjectMapper mapper; - final FilesStorageStrategy storageStrategy; - - - public InMemoryWithFilesStorageIT() - { - super(Clock.systemDefaultZone()); - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - storageStrategy = new FilesStorageStrategy( - path, - chatRoomId -> 0, - mapper, - Level.FINE, - true); - } - - - @Override - protected StorageStrategy getStorageStrategy() - { - return storageStrategy; - } - @BeforeEach - void reset() throws Exception + void resetStorage( + @Autowired ChatBackendProperties properties) + throws Exception { + Path path = Paths.get(properties.getInmemory().getStorageDirectory()); if (Files.exists(path)) { Files diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java index 7055af54..8f4e37a3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java @@ -1,83 +1,45 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import de.juplo.kafka.chat.backend.storage.mongodb.ChatRoomRepository; import de.juplo.kafka.chat.backend.storage.mongodb.MessageRepository; -import de.juplo.kafka.chat.backend.storage.mongodb.MongoDbStorageStrategy; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.springframework.context.annotation.Bean; -import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.context.TestPropertySource; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import java.time.Clock; - +@TestPropertySource(properties = { + "chat.backend.inmemory.sharding-strategy=none", + "chat.backend.inmemory.storage-strategy=mongodb" }) @Testcontainers -@ExtendWith({SpringExtension.class}) @EnableAutoConfiguration -@AutoConfigureDataMongo @Slf4j public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT { - @Autowired - MongoDbStorageStrategy storageStrategy; - @Autowired - ChatRoomRepository chatRoomRepository; - @Autowired - MessageRepository messageRepository; - - - public InMemoryWithMongoDbStorageIT() - { - super(Clock.systemDefaultZone()); - } - - - @Override - protected StorageStrategy getStorageStrategy() - { - return storageStrategy; - } - - @TestConfiguration - static class InMemoryWithMongoDbStorageStrategyITConfig - { - @Bean - MongoDbStorageStrategy storageStrategy( - ChatRoomRepository chatRoomRepository, - MessageRepository messageRepository) - { - return new MongoDbStorageStrategy(chatRoomRepository, messageRepository); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - } - @Container @ServiceConnection private static final GenericContainer MONGODB = new MongoDBContainer("mongo:6"); + @BeforeEach + void resetStorage( + @Autowired ChatRoomRepository chatRoomRepository, + @Autowired MessageRepository messageRepository) + { + chatRoomRepository.deleteAll(); + messageRepository.deleteAll(); + } + @BeforeEach void setUpLogging() { Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); MONGODB.followOutput(logConsumer); - chatRoomRepository.deleteAll(); - messageRepository.deleteAll(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java new file mode 100644 index 00000000..3f6cf9c0 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java @@ -0,0 +1,31 @@ +package de.juplo.kafka.chat.backend.implementation.inmemory; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; +import org.springframework.beans.factory.annotation.Autowired; + +import java.time.Clock; + + +public class InMemoryTestUtils +{ + private final InMemoryServicesConfiguration config = + new InMemoryServicesConfiguration(); + + @Autowired + ChatBackendProperties properties; + @Autowired + StorageStrategy storageStrategy; + @Autowired + Clock clock; + + + public ChatHomeService createNoneShardingChatHomeService() + { + return config.noneShardingChatHome( + properties, + storageStrategy, + clock); + } +} -- 2.20.1 From 803a9193c8b1ad1ec68f1818fc091291a60c8f4c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 16:12:00 +0100 Subject: [PATCH 05/16] refactor: RED - Refined success/error-handling for restore-operations * This innocent little change discloses a severe missconception in the implementation of the storage strategies. * The call to `Mono.block()`, though not really changing the behaviour during the restore-process, triggers a sanity-check from io.projectractor. --- .../inmemory/InMemoryChatMessageService.java | 7 ++++++- .../inmemory/SimpleChatHomeService.java | 15 ++++++++++----- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 8f3e4956..0345c008 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -20,7 +20,12 @@ public class InMemoryChatMessageService implements ChatMessageService { log.debug("Creating InMemoryChatMessageService"); messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> messages.put(message.getKey(), message)); + messageFlux + .doOnNext(message -> messages.put(message.getKey(), message)) + .then() + .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService")) + .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")) + .block(); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 93593d86..dfe8567a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -40,11 +40,12 @@ public class SimpleChatHomeService implements ChatHomeService Clock clock, int bufferSize) { - this.shard = shard; - log.info("Created {}", this); + log.debug("Creating SimpleChatHomeService"); + this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); + storageStrategy .readChatRoomInfo() .filter(info -> @@ -62,8 +63,7 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .toStream() - .forEach(info -> + .doOnNext(info -> { UUID chatRoomId = info.getId(); chatRoomInfo.put(chatRoomId, info); @@ -75,7 +75,12 @@ public class SimpleChatHomeService implements ChatHomeService clock, new InMemoryChatMessageService(messageFlux), bufferSize)); - }); + }) + .then() + .doOnSuccess(empty -> log.info("Restored {}", this)) + .doOnError(throwable -> log.error("Could not restore {}", this)) + .block(); + this.clock = clock; this.bufferSize = bufferSize; } -- 2.20.1 From dc53848961fd5622f777621fd4140cb01c2c8739 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 22 Feb 2024 16:03:08 +0100 Subject: [PATCH 06/16] fix: GREEN - Fixed the restore-mechanism * The code of a reactive flow _must not_ call blocking functions. * In order to solve this, the restore-process is triggered explicitly after the creation of the classes. --- .../inmemory/InMemoryChatMessageService.java | 18 ++++++++--- .../InMemoryServicesConfiguration.java | 18 ++++++----- .../inmemory/SimpleChatHomeService.java | 30 ++++++++++--------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 0345c008..7d4b9b62 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -2,30 +2,40 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.ChatMessageService; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; import java.util.LinkedHashMap; +import java.util.UUID; @Slf4j public class InMemoryChatMessageService implements ChatMessageService { + private final UUID chatRoomId; private final LinkedHashMap messages; - public InMemoryChatMessageService(Flux messageFlux) + public InMemoryChatMessageService(UUID chatRoomId) { log.debug("Creating InMemoryChatMessageService"); + this.chatRoomId = chatRoomId; messages = new LinkedHashMap<>(); - messageFlux + } + + + Mono restore(StorageStrategy storageStrategy) + { + Flux messageFlux = storageStrategy.readChatRoomData(chatRoomId); + + return messageFlux .doOnNext(message -> messages.put(message.getKey(), message)) .then() .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService")) - .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")) - .block(); + .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 5b5785ea..518cf41b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -31,10 +31,11 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy, Clock clock) { - return new SimpleChatHomeService( - storageStrategy, + SimpleChatHomeService chatHomeService = new SimpleChatHomeService( clock, properties.getChatroomBufferSize()); + chatHomeService.restore(storageStrategy).block(); + return chatHomeService; } @Bean @@ -51,11 +52,14 @@ public class InMemoryServicesConfiguration SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - properties.getChatroomBufferSize())); + .forEach(shard -> + { + SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService( + shard, + clock, + properties.getChatroomBufferSize()); + service.restore(storageStrategy).block(); + }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( properties.getInstanceId(), diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index dfe8567a..5c3fe2e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -23,20 +23,17 @@ public class SimpleChatHomeService implements ChatHomeService public SimpleChatHomeService( - StorageStrategy storageStrategy, Clock clock, int bufferSize) { this( null, - storageStrategy, clock, bufferSize); } public SimpleChatHomeService( Integer shard, - StorageStrategy storageStrategy, Clock clock, int bufferSize) { @@ -45,8 +42,14 @@ public class SimpleChatHomeService implements ChatHomeService this.shard = shard; this.chatRoomInfo = new HashMap<>(); this.chatRoomData = new HashMap<>(); + this.clock = clock; + this.bufferSize = bufferSize; + } - storageStrategy + + Mono restore(StorageStrategy storageStrategy) + { + return storageStrategy .readChatRoomInfo() .filter(info -> { @@ -63,26 +66,25 @@ public class SimpleChatHomeService implements ChatHomeService return false; } }) - .doOnNext(info -> + .flatMap(info -> { UUID chatRoomId = info.getId(); + InMemoryChatMessageService chatMessageService = + new InMemoryChatMessageService(chatRoomId); + chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); chatRoomData.put( info.getId(), new ChatRoomData( clock, - new InMemoryChatMessageService(messageFlux), + chatMessageService, bufferSize)); + + return chatMessageService.restore(storageStrategy); }) .then() .doOnSuccess(empty -> log.info("Restored {}", this)) - .doOnError(throwable -> log.error("Could not restore {}", this)) - .block(); - - this.clock = clock; - this.bufferSize = bufferSize; + .doOnError(throwable -> log.error("Could not restore {}", this)); } @@ -90,7 +92,7 @@ public class SimpleChatHomeService implements ChatHomeService public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatMessageService service = new InMemoryChatMessageService(Flux.empty()); + ChatMessageService service = new InMemoryChatMessageService(id); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); this.chatRoomInfo.put(id, chatRoomInfo); ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); -- 2.20.1 From ed777a6fb33ffc8b2cdb6cf7df9cce51c39d44da Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 22 Feb 2024 16:46:27 +0100 Subject: [PATCH 07/16] test: `StorageStrategy`-IT are restoring instead of recreating --- .../InMemoryServicesConfiguration.java | 4 +-- .../inmemory/SimpleChatHomeService.java | 3 +++ .../kafka/KafkaServicesConfiguration.java | 2 +- .../backend/AbstractInMemoryStorageIT.java | 12 ++++----- .../backend/AbstractStorageStrategyIT.java | 26 ++++++++----------- .../inmemory/InMemoryTestUtils.java | 15 +++-------- 6 files changed, 25 insertions(+), 37 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 518cf41b..3f3d8884 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -26,7 +26,7 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHomeService noneShardingChatHome( + SimpleChatHomeService noneShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) @@ -43,7 +43,7 @@ public class InMemoryServicesConfiguration prefix = "chat.backend.inmemory", name = "sharding-strategy", havingValue = "kafkalike") - ChatHomeService kafkalikeShardingChatHome( + ShardedChatHomeService kafkalikeShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 5c3fe2e5..371d4a87 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -49,6 +49,9 @@ public class SimpleChatHomeService implements ChatHomeService Mono restore(StorageStrategy storageStrategy) { + chatRoomInfo.clear(); + chatRoomData.clear(); + return storageStrategy .readChatRoomInfo() .filter(info -> diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index f8bebd6d..b5a442f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -109,7 +109,7 @@ public class KafkaServicesConfiguration } @Bean - ChatHomeService kafkaChatHome( + KafkaChatHomeService kafkaChatHome( ChatBackendProperties properties, InfoChannel infoChannel, DataChannel dataChannel) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java index 0ec0bc19..843b9490 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractInMemoryStorageIT.java @@ -1,15 +1,11 @@ package de.juplo.kafka.chat.backend; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.implementation.StorageStrategy; -import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryTestUtils; +import de.juplo.kafka.chat.backend.implementation.inmemory.SimpleChatHomeService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; -import java.time.Clock; - @ContextConfiguration(classes = InMemoryTestUtils.class) @Slf4j @@ -17,10 +13,12 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI { @Autowired InMemoryTestUtils testUtils; + @Autowired + SimpleChatHomeService simpleChatHomeService; @Override - ChatHomeService getChatHome() + void restore() { - return testUtils.createNoneShardingChatHomeService(); + testUtils.restore(simpleChatHomeService).block(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 9568545a..7c9c07c3 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -28,19 +28,15 @@ import static pl.rzrz.assertj.reactor.Assertions.*; @Slf4j public abstract class AbstractStorageStrategyIT { + @Autowired ChatHomeService chathome; - @Autowired StorageStrategy storageStrategy; - abstract ChatHomeService getChatHome(); - protected void start() - { - chathome = getChatHome(); - } + abstract void restore(); - protected void stop() + void store() { storageStrategy .write(chathome) @@ -48,9 +44,9 @@ public abstract class AbstractStorageStrategyIT } @Test - protected void testStoreAndRecreate() + void testStoreAndRecreate() { - start(); + restore(); assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); @@ -69,8 +65,8 @@ public abstract class AbstractStorageStrategyIT .getChatRoomData(chatRoomId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); - stop(); - start(); + store(); + restore(); assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); @@ -80,9 +76,9 @@ public abstract class AbstractStorageStrategyIT } @Test - protected void testStoreAndRecreateParallelChatRooms() + void testStoreAndRecreateParallelChatRooms() { - start(); + restore(); assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); @@ -114,8 +110,8 @@ public abstract class AbstractStorageStrategyIT .getChatRoomData(chatRoomBId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); - stop(); - start(); + store(); + restore(); assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java index 3f6cf9c0..cfc8e728 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryTestUtils.java @@ -4,28 +4,19 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import org.springframework.beans.factory.annotation.Autowired; +import reactor.core.publisher.Mono; import java.time.Clock; public class InMemoryTestUtils { - private final InMemoryServicesConfiguration config = - new InMemoryServicesConfiguration(); - - @Autowired - ChatBackendProperties properties; @Autowired StorageStrategy storageStrategy; - @Autowired - Clock clock; - public ChatHomeService createNoneShardingChatHomeService() + public Mono restore(SimpleChatHomeService simpleChatHomeService) { - return config.noneShardingChatHome( - properties, - storageStrategy, - clock); + return simpleChatHomeService.restore(storageStrategy); } } -- 2.20.1 From 4f2a6dfe5a1c620ad795328853c1be71a2581771 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 16:14:22 +0100 Subject: [PATCH 08/16] feat: Added counting of restored instances --- .../inmemory/InMemoryChatMessageService.java | 7 ++++--- .../implementation/inmemory/SimpleChatHomeService.java | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java index 7d4b9b62..5d5feb87 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatMessageService.java @@ -33,9 +33,10 @@ public class InMemoryChatMessageService implements ChatMessageService return messageFlux .doOnNext(message -> messages.put(message.getKey(), message)) - .then() - .doOnSuccess(empty -> log.info("Restored InMemoryChatMessageService")) - .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")); + .count() + .doOnSuccess(count -> log.info("Restored InMemoryChatMessageService with {} messages", count)) + .doOnError(throwable -> log.error("Could not restore InMemoryChatMessageService")) + .then(); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 371d4a87..d568a9b4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -85,9 +85,10 @@ public class SimpleChatHomeService implements ChatHomeService return chatMessageService.restore(storageStrategy); }) - .then() - .doOnSuccess(empty -> log.info("Restored {}", this)) - .doOnError(throwable -> log.error("Could not restore {}", this)); + .count() + .doOnSuccess(count -> log.info("Restored {} with {} chat-rooms", this, count)) + .doOnError(throwable -> log.error("Could not restore {}", this)) + .then(); } -- 2.20.1 From 7107fd0b4795d95b6c7f49cf4e765b82c16d016e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 17:32:54 +0100 Subject: [PATCH 09/16] refactor: DRY for logging-category from io.projectreactor --- .../kafka/chat/backend/api/ChatBackendController.java | 3 ++- .../chat/backend/storage/files/FilesStorageStrategy.java | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 47ae6a5d..6c7cff7d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -18,6 +18,7 @@ public class ChatBackendController { private final ChatHomeService chatHomeService; private final StorageStrategy storageStrategy; + private final String loggingCategory = ChatBackendController.class.getSimpleName(); private final Level loggingLevel; private final boolean showOperatorLine; @@ -133,7 +134,7 @@ public class ChatBackendController return chatRoomData .listen() .log( - ChatBackendController.class.getSimpleName(), + loggingCategory, loggingLevel, showOperatorLine) .map(message -> MessageTo.from(message)) diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java index aaa61598..7c1fef02 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageStrategy.java @@ -33,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final ShardingStrategy shardingStrategy; private final ObjectMapper mapper; + private final String loggingCategory = FilesStorageStrategy.class.getSimpleName(); private final Level loggingLevel; private final boolean showOperatorLine; @@ -53,7 +54,7 @@ public class FilesStorageStrategy implements StorageStrategy return chatRoomInfoFlux .log( - FilesStorageStrategy.class.getSimpleName(), + loggingCategory, loggingLevel, showOperatorLine) .doFirst(() -> @@ -107,7 +108,7 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log( - FilesStorageStrategy.class.getSimpleName(), + loggingCategory, loggingLevel, showOperatorLine) .map(chatRoomInfoTo -> @@ -146,7 +147,7 @@ public class FilesStorageStrategy implements StorageStrategy return messageFlux .log( - FilesStorageStrategy.class.getSimpleName(), + loggingCategory, loggingLevel, showOperatorLine) .doFirst(() -> @@ -200,7 +201,7 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) .log( - FilesStorageStrategy.class.getSimpleName(), + loggingCategory, loggingLevel, showOperatorLine) .map(MessageTo::toMessage); -- 2.20.1 From d4f83979aaa9cb12ecf97a428f8f04f35ae3547d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Feb 2024 17:35:14 +0100 Subject: [PATCH 10/16] feat: Added logging for io.projectreactor to `MongoDbStorageStrategy` --- .../mongodb/MongoDbStorageConfiguration.java | 10 ++++++++-- .../mongodb/MongoDbStorageStrategy.java | 20 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java index 1b5803ac..7c5d9cb5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.storage.mongodb; +import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -17,8 +18,13 @@ public class MongoDbStorageConfiguration @Bean public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, - MessageRepository messageRepository) + MessageRepository messageRepository, + ChatBackendProperties properties) { - return new MongoDbStorageStrategy(chatRoomRepository, messageRepository); + return new MongoDbStorageStrategy( + chatRoomRepository, + messageRepository, + properties.getProjectreactor().getLoggingLevel(), + properties.getProjectreactor().isShowOperatorLine()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index fb040395..81696450 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import java.util.UUID; +import java.util.logging.Level; @RequiredArgsConstructor @@ -16,12 +17,19 @@ public class MongoDbStorageStrategy implements StorageStrategy { private final ChatRoomRepository chatRoomRepository; private final MessageRepository messageRepository; + private final String loggingCategory = MongoDbStorageStrategy.class.getSimpleName(); + private final Level loggingLevel; + private final boolean showOperatorLine; @Override public Flux writeChatRoomInfo(Flux chatRoomInfoFlux) { return chatRoomInfoFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(ChatRoomTo::from) .map(chatRoomRepository::save) .map(ChatRoomTo::toChatRoomInfo); @@ -32,6 +40,10 @@ public class MongoDbStorageStrategy implements StorageStrategy { return Flux .fromIterable(chatRoomRepository.findAll()) + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(ChatRoomTo::toChatRoomInfo); } @@ -39,6 +51,10 @@ public class MongoDbStorageStrategy implements StorageStrategy public Flux writeChatRoomData(UUID chatRoomId, Flux messageFlux) { return messageFlux + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(message -> MessageTo.from(chatRoomId, message)) .map(messageRepository::save) .map(MessageTo::toMessage); @@ -49,6 +65,10 @@ public class MongoDbStorageStrategy implements StorageStrategy { return Flux .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + .log( + loggingCategory, + loggingLevel, + showOperatorLine) .map(MessageTo::toMessage); } } -- 2.20.1 From 2ba733abfef3caf7d663397827801f58a18050cf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Feb 2024 00:24:07 +0100 Subject: [PATCH 11/16] feat: Switched to `spring-boot-starter-data-mongodb-reactive` --- pom.xml | 2 +- .../files/FilesStorageConfiguration.java | 6 ++-- .../storage/mongodb/ChatRoomRepository.java | 4 +-- .../storage/mongodb/MessageRepository.java | 9 +++--- .../mongodb/MongoDbStorageStrategy.java | 12 +++---- .../NoStorageStorageConfiguration.java | 6 ++-- .../backend/AbstractStorageStrategyIT.java | 2 +- .../backend/InMemoryWithMongoDbStorageIT.java | 4 +-- .../storage/mongodb/ChatRoomRepositoryIT.java | 24 +++++++------- .../storage/mongodb/MessageRepositoryIT.java | 32 +++++++++---------- 10 files changed, 52 insertions(+), 49 deletions(-) diff --git a/pom.xml b/pom.xml index 9f6db208..e98a75e4 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ org.springframework.boot - spring-boot-starter-data-mongodb + spring-boot-starter-data-mongodb-reactive org.apache.kafka diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java index 630c1fa9..72f8e7d5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java @@ -6,7 +6,8 @@ import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveDataAutoConfiguration; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveRepositoriesAutoConfiguration; import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -21,7 +22,8 @@ import java.nio.file.Paths; @Configuration @EnableAutoConfiguration( exclude = { - MongoRepositoriesAutoConfiguration.class, + MongoReactiveDataAutoConfiguration.class, + MongoReactiveRepositoriesAutoConfiguration.class, MongoAutoConfiguration.class }) public class FilesStorageConfiguration { diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java index d9f6a0f5..9262bf66 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepository.java @@ -1,8 +1,8 @@ package de.juplo.kafka.chat.backend.storage.mongodb; -import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; -public interface ChatRoomRepository extends MongoRepository +public interface ChatRoomRepository extends ReactiveMongoRepository { } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java index 712f5a4a..db8503c3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepository.java @@ -1,11 +1,10 @@ package de.juplo.kafka.chat.backend.storage.mongodb; -import org.springframework.data.mongodb.repository.MongoRepository; +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; +import reactor.core.publisher.Flux; -import java.util.List; - -public interface MessageRepository extends MongoRepository +public interface MessageRepository extends ReactiveMongoRepository { - List findByChatRoomIdOrderBySerialAsc(String chatRoomId); + Flux findByChatRoomIdOrderBySerialAsc(String chatRoomId); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index 81696450..13f3c0d6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -31,15 +31,15 @@ public class MongoDbStorageStrategy implements StorageStrategy loggingLevel, showOperatorLine) .map(ChatRoomTo::from) - .map(chatRoomRepository::save) + .flatMap(chatRoomRepository::save) .map(ChatRoomTo::toChatRoomInfo); } @Override public Flux readChatRoomInfo() { - return Flux - .fromIterable(chatRoomRepository.findAll()) + return chatRoomRepository + .findAll() .log( loggingCategory, loggingLevel, @@ -56,15 +56,15 @@ public class MongoDbStorageStrategy implements StorageStrategy loggingLevel, showOperatorLine) .map(message -> MessageTo.from(chatRoomId, message)) - .map(messageRepository::save) + .flatMap(messageRepository::save) .map(MessageTo::toMessage); } @Override public Flux readChatRoomData(UUID chatRoomId) { - return Flux - .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + return messageRepository + .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()) .log( loggingCategory, loggingLevel, diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java index 2e499ef3..dc2bdfcc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java @@ -3,7 +3,8 @@ package de.juplo.kafka.chat.backend.storage.nostorage; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveDataAutoConfiguration; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveRepositoriesAutoConfiguration; import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,7 +18,8 @@ import org.springframework.context.annotation.Configuration; @Configuration @EnableAutoConfiguration( exclude = { - MongoRepositoriesAutoConfiguration.class, + MongoReactiveDataAutoConfiguration.class, + MongoReactiveRepositoriesAutoConfiguration.class, MongoAutoConfiguration.class }) public class NoStorageStorageConfiguration { diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java index 7c9c07c3..74c7c6fd 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractStorageStrategyIT.java @@ -40,7 +40,7 @@ public abstract class AbstractStorageStrategyIT { storageStrategy .write(chathome) - .subscribe(); + .block(); } @Test diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java index 8f4e37a3..464513bd 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbStorageIT.java @@ -32,8 +32,8 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT @Autowired ChatRoomRepository chatRoomRepository, @Autowired MessageRepository messageRepository) { - chatRoomRepository.deleteAll(); - messageRepository.deleteAll(); + chatRoomRepository.deleteAll().block(); + messageRepository.deleteAll().block(); } @BeforeEach diff --git a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java index 5c6e10cf..d0a7f6bd 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomRepositoryIT.java @@ -36,33 +36,33 @@ public class ChatRoomRepositoryIT @BeforeEach public void setUp() { - repository.deleteAll(); + repository.deleteAll().block(); - a = repository.save(new ChatRoomTo("a", "foo")); - b = repository.save(new ChatRoomTo("b", "bar")); - c = repository.save(new ChatRoomTo("c", "bar")); + a = repository.save(new ChatRoomTo("a", "foo")).block(); + b = repository.save(new ChatRoomTo("b", "bar")).block(); + c = repository.save(new ChatRoomTo("c", "bar")).block(); } @Test public void findsAll() { - assertThat(repository.findAll()).containsExactly(a, b, c); + assertThat(repository.findAll()).emitsExactly(a, b, c); } @Test public void findsById() { - assertThat(repository.findById("a")).contains(a); - assertThat(repository.findById("b")).contains(b); - assertThat(repository.findById("c")).contains(c); - assertThat(repository.findById("666")).isEmpty(); + assertThat(repository.findById("a")).emitsExactly(a); + assertThat(repository.findById("b")).emitsExactly(b); + assertThat(repository.findById("c")).emitsExactly(c); + assertThat(repository.findById("666")).emitsExactly(); } @Test public void findsByExample() { - assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foo")))).containsExactly(a); - assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "bar")))).containsExactly(b, c); - assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foobar")))).isEmpty(); + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foo")))).emitsExactly(a); + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "bar")))).emitsExactly(b, c); + assertThat(repository.findAll(Example.of(new ChatRoomTo(null, "foobar")))).emitsExactly(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java index 3e8a7160..06fddb2f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/storage/mongodb/MessageRepositoryIT.java @@ -40,7 +40,7 @@ public class MessageRepositoryIT @BeforeEach public void setUp() { - repository.deleteAll(); + repository.deleteAll().block(); foo = UUID.randomUUID(); bar = UUID.randomUUID(); @@ -54,64 +54,64 @@ public class MessageRepositoryIT Message.MessageKey.of("peter", 1l), serial++, now.plusSeconds(serial), - "Nachricht #" + serial))); + "Nachricht #" + serial))).block(); foo_2 = repository.save(MessageTo.from( foo, new Message( Message.MessageKey.of("ute", 2l), serial++, now.plusSeconds(serial), - "Nachricht #" + serial))); + "Nachricht #" + serial))).block(); bar_1 = repository.save(MessageTo.from( bar, new Message( Message.MessageKey.of("klaus", 1l), serial++, now.plusSeconds(serial), - "Nachricht #" + serial))); + "Nachricht #" + serial))).block(); bar_2 = repository.save(MessageTo.from( bar, new Message( Message.MessageKey.of("beate", 2l), serial++, now.plusSeconds(serial), - "Nachricht #" + serial))); + "Nachricht #" + serial))).block(); bar_3 = repository.save(MessageTo.from( bar, new Message( Message.MessageKey.of("peter", 3l), serial++, now.plusSeconds(serial), - "Nachricht #" + serial))); + "Nachricht #" + serial))).block(); } @Test public void findsAll() { - assertThat(repository.findAll()).containsExactly(foo_1, foo_2, bar_1, bar_2, bar_3); + assertThat(repository.findAll()).emitsExactly(foo_1, foo_2, bar_1, bar_2, bar_3); } @Test public void findsByExample_chatRoomId() { - assertThat(repository.findAll(Example.of(new MessageTo(foo.toString(), null, null, null, null, null)))).containsExactly(foo_1, foo_2); - assertThat(repository.findAll(Example.of(new MessageTo(bar.toString(), null, null, null, null, null)))).containsExactly(bar_1, bar_2, bar_3); + assertThat(repository.findAll(Example.of(new MessageTo(foo.toString(), null, null, null, null, null)))).emitsExactly(foo_1, foo_2); + assertThat(repository.findAll(Example.of(new MessageTo(bar.toString(), null, null, null, null, null)))).emitsExactly(bar_1, bar_2, bar_3); } @Test public void findsByExample_user() { - assertThat(repository.findAll(Example.of(new MessageTo(null, "peter", null, null, null, null)))).containsExactly(foo_1, bar_3); - assertThat(repository.findAll(Example.of(new MessageTo(null, "klaus", null, null, null, null)))).containsExactly(bar_1); - assertThat(repository.findAll(Example.of(new MessageTo(null, "ute", null, null, null, null)))).containsExactly(foo_2); - assertThat(repository.findAll(Example.of(new MessageTo(null, "beate", null, null, null, null)))).containsExactly(bar_2); + assertThat(repository.findAll(Example.of(new MessageTo(null, "peter", null, null, null, null)))).emitsExactly(foo_1, bar_3); + assertThat(repository.findAll(Example.of(new MessageTo(null, "klaus", null, null, null, null)))).emitsExactly(bar_1); + assertThat(repository.findAll(Example.of(new MessageTo(null, "ute", null, null, null, null)))).emitsExactly(foo_2); + assertThat(repository.findAll(Example.of(new MessageTo(null, "beate", null, null, null, null)))).emitsExactly(bar_2); } @Test public void findsByExample_id() { - assertThat(repository.findAll(Example.of(new MessageTo(null, null, 1l, null, null, null)))).containsExactly(foo_1, bar_1); - assertThat(repository.findAll(Example.of(new MessageTo(null, null, 2l, null, null, null)))).containsExactly(foo_2, bar_2); - assertThat(repository.findAll(Example.of(new MessageTo(null, null, 3l, null, null, null)))).containsExactly(bar_3); + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 1l, null, null, null)))).emitsExactly(foo_1, bar_1); + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 2l, null, null, null)))).emitsExactly(foo_2, bar_2); + assertThat(repository.findAll(Example.of(new MessageTo(null, null, 3l, null, null, null)))).emitsExactly(bar_3); } } -- 2.20.1 From 49a8c8483e432a20a107d95771ba3806811b55a0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 23 Feb 2024 11:39:26 +0100 Subject: [PATCH 12/16] fix: Disabled mongodb-autoconfig, if mongodb is not used --- .../backend/ChatBackendConfiguration.java | 16 ++++++++++++++++ .../files/FilesStorageConfiguration.java | 9 --------- .../NoStorageStorageConfiguration.java | 9 --------- .../backend/InMemoryWithFilesStorageIT.java | 19 +++++++++++++++++++ .../backend/domain/ChatHomeServiceTest.java | 10 ++++++++++ 5 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index f49e385b..ad944fa3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -1,5 +1,10 @@ package de.juplo.kafka.chat.backend; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveDataAutoConfiguration; +import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -16,4 +21,15 @@ public class ChatBackendConfiguration { return Clock.systemDefaultZone(); } + + + @ConditionalOnExpression("!'${chat.backend.inmemory.storage-strategy}'.toLowerCase().equals('mongodb')") + @Configuration + @EnableAutoConfiguration(exclude = { + MongoReactiveDataAutoConfiguration.class, + MongoReactiveAutoConfiguration.class, + MongoReactiveRepositoriesAutoConfiguration.class }) + public static class DisableMongoDBConfiguration + { + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java index 72f8e7d5..45175c9f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/files/FilesStorageConfiguration.java @@ -4,11 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveDataAutoConfiguration; -import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveRepositoriesAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -20,11 +16,6 @@ import java.nio.file.Paths; name = "storage-strategy", havingValue = "files") @Configuration -@EnableAutoConfiguration( - exclude = { - MongoReactiveDataAutoConfiguration.class, - MongoReactiveRepositoriesAutoConfiguration.class, - MongoAutoConfiguration.class }) public class FilesStorageConfiguration { @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java index dc2bdfcc..0d50b892 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/nostorage/NoStorageStorageConfiguration.java @@ -1,11 +1,7 @@ package de.juplo.kafka.chat.backend.storage.nostorage; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveDataAutoConfiguration; -import org.springframework.boot.autoconfigure.data.mongo.MongoReactiveRepositoriesAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -16,11 +12,6 @@ import org.springframework.context.annotation.Configuration; havingValue = "none", matchIfMissing = true) @Configuration -@EnableAutoConfiguration( - exclude = { - MongoReactiveDataAutoConfiguration.class, - MongoReactiveRepositoriesAutoConfiguration.class, - MongoAutoConfiguration.class }) public class NoStorageStorageConfiguration { @Bean diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java index e4eaf3ab..0d63b7ed 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesStorageIT.java @@ -1,8 +1,13 @@ package de.juplo.kafka.chat.backend; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import java.io.IOException; @@ -15,6 +20,7 @@ import java.nio.file.Paths; "chat.backend.inmemory.sharding-strategy=none", "chat.backend.inmemory.storage-strategy=files", "chat.backend.inmemory.storage-directory=target/files" }) +@ContextConfiguration(classes = InMemoryWithFilesStorageIT.TestConfig.class) @Slf4j public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT { @@ -47,4 +53,17 @@ public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT Files.delete(path); } } + + + static class TestConfig + { + @Bean + ObjectMapper objectMapper() + { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + return objectMapper; + } + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java index 858d03d3..d859b141 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java @@ -1,5 +1,7 @@ package de.juplo.kafka.chat.backend.domain; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; @@ -79,6 +81,14 @@ public abstract class ChatHomeServiceTest static class TestConfiguration { + @Bean + ObjectMapper objectMapper() + { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + return objectMapper; + } + @Bean Clock clock() { -- 2.20.1 From e2933c8acd7ffd1c3a8530b0d34621c183ef09fa Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 24 Feb 2024 12:56:24 +0100 Subject: [PATCH 13/16] fix: The number of chards is the number of partitions of the `data_channel` --- .../kafka/chat/backend/implementation/kafka/InfoChannel.java | 5 ++--- .../implementation/kafka/KafkaServicesConfiguration.java | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2df7573c..f5d5253c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -48,6 +48,7 @@ public class InfoChannel implements Runnable String topic, Producer producer, Consumer infoChannelConsumer, + int numShards, URI instanceUri) { log.debug( @@ -58,9 +59,7 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); - this.numShards = consumer - .partitionsFor(topic) - .size(); + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index b5a442f8..5a41ebcc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -130,6 +130,7 @@ public class KafkaServicesConfiguration properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, + properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); } -- 2.20.1 From e73b46955f6b7aff3aa1ecb79b2aee75fa76b097 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 25 Feb 2024 20:50:21 +0100 Subject: [PATCH 14/16] feat: Made polling-interval for Kafka configurable --- .../de/juplo/kafka/chat/backend/ChatBackendProperties.java | 2 ++ .../chat/backend/implementation/kafka/DataChannel.java | 5 ++++- .../chat/backend/implementation/kafka/InfoChannel.java | 6 +++++- .../implementation/kafka/KafkaServicesConfiguration.java | 2 ++ 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index df4d1cd7..cfc296b1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import java.net.URI; import java.nio.file.Paths; +import java.time.Duration; import java.util.logging.Level; @@ -45,6 +46,7 @@ public class ChatBackendProperties private String infoChannelTopic = "info_channel"; private String dataChannelTopic = "data_channel"; private int numPartitions = 2; + private Duration pollingInterval = Duration.ofSeconds(1); private String haproxyRuntimeApi = "haproxy:8401"; private String haproxyMap = "/usr/local/etc/haproxy/sharding.map"; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index f139b765..fdb16fbd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -28,6 +28,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final Duration pollingInterval; private final int bufferSize; private final Clock clock; private final boolean[] isShardOwned; @@ -49,6 +50,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Consumer dataChannelConsumer, ZoneId zoneId, int numShards, + Duration pollingInterval, int bufferSize, Clock clock, InfoChannel infoChannel, @@ -65,6 +67,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.pollingInterval = pollingInterval; this.bufferSize = bufferSize; this.clock = clock; this.isShardOwned = new boolean[numShards]; @@ -189,7 +192,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); if (loadInProgress) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index f5d5253c..a6351d0f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -32,6 +32,7 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; private final String[] shardOwners; private final long[] currentOffset; @@ -48,6 +49,7 @@ public class InfoChannel implements Runnable String topic, Producer producer, Consumer infoChannelConsumer, + Duration pollingInterval, int numShards, URI instanceUri) { @@ -59,6 +61,8 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); + this.pollingInterval = pollingInterval; + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; @@ -191,7 +195,7 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); for (ConsumerRecord record : records) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 5a41ebcc..6b7c1566 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -130,6 +130,7 @@ public class KafkaServicesConfiguration properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, + properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); } @@ -151,6 +152,7 @@ public class KafkaServicesConfiguration dataChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), + properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, infoChannel, -- 2.20.1 From fd0c7edb6ca97b53a13d8e6d1bf5149d53d36a5f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 25 Feb 2024 21:44:16 +0100 Subject: [PATCH 15/16] fix: Sensible value for sink-buffer --- .../java/de/juplo/kafka/chat/backend/ChatBackendProperties.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index cfc296b1..c3609a39 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -17,7 +17,7 @@ public class ChatBackendProperties { private String instanceId = "DEV"; private String allowedOrigins = "http://localhost:4200"; - private int chatroomBufferSize = 8; + private int chatroomBufferSize = 1024; private ServiceType services = ServiceType.inmemory; private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); private KafkaServicesProperties kafka = new KafkaServicesProperties(); -- 2.20.1 From 13f86063f851fc2c4ad6de56c8edb78bff9d0592 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 26 Feb 2024 19:55:19 +0100 Subject: [PATCH 16/16] fix: Errors during shard-publishing should not kill the instance * `HaproxyShardingPublisherStrategy` has to transform any exception into a `Mono.error()`. * `DataChannel.onPartitionsAssigned(..)` has to log and swallow errors during the propagation of the shard-ownership. --- .../haproxy/HaproxyShardingPublisherStrategy.java | 3 +-- .../kafka/chat/backend/implementation/kafka/DataChannel.java | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java index 3caaeb38..ad71d497 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/haproxy/HaproxyShardingPublisherStrategy.java @@ -5,7 +5,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; -import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; @@ -33,7 +32,7 @@ public class HaproxyShardingPublisherStrategy implements ShardingPublisherStrate socketChannel.close(); return Mono.just(instanceId); } - catch (IOException e) + catch (Exception e) { return Mono.error(e); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index fdb16fbd..b4cc33f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -155,7 +155,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener "Could not publish instance {} as owner of shard {}: {}", instanceId, partition, - throwable)) + throwable.toString())) + .onErrorComplete() .block(); }); -- 2.20.1