From 61b76ad936e83f35f2e6a72d57fecf08b9999207 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Sep 2023 17:43:56 +0200 Subject: [PATCH] WIP --- .../InMemoryServicesConfiguration.java | 31 ++++++++-------- .../AbstractStorageStrategyIT.java | 9 ++--- .../inmemory/ShardedChatHomeTest.java | 36 +++++++++++-------- .../inmemory/SimpleChatHomeTest.java | 35 ++++++++++-------- 4 files changed, 60 insertions(+), 51 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index ee41c4f3..106c7369 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -25,9 +25,15 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHome noneShardingChatHome(StorageStrategy storageStrategy) + ChatHome noneShardingChatHome( + ChatBackendProperties properties, + StorageStrategy storageStrategy, + Clock clock) { - return new SimpleChatHome(storageStrategy.read()); + return new SimpleChatHome( + storageStrategy.read(), + clock, + properties.getChatroomBufferSize()); } @Bean @@ -37,29 +43,22 @@ public class InMemoryServicesConfiguration havingValue = "kafkalike") ChatHome kafkalikeShardingChatHome( ChatBackendProperties properties, - StorageStrategy storageStrategy) + StorageStrategy storageStrategy, + Clock clock) { int numShards = properties.getInmemory().getNumShards(); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHome(shard, storageStrategy.read())); + .forEach(shard -> chatHomes[shard] = new SimpleChatHome( + shard, + storageStrategy.read(), + clock, + properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHome(chatHomes, strategy); } - @Bean - InMemoryChatRoomFactory chatRoomFactory( - ShardingStrategy strategy, - Clock clock, - ChatBackendProperties properties) - { - return new InMemoryChatRoomFactory( - strategy, - clock, - properties.getChatroomBufferSize()); - } - @ConditionalOnProperty( prefix = "chat.backend.inmemory", name = "sharding-strategy", diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 58acb192..2a42a128 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -14,7 +14,6 @@ import static pl.rzrz.assertj.reactor.Assertions.*; public abstract class AbstractStorageStrategyIT { protected ChatHome chathome; - protected ChatRoomFactory chatRoomFactory; protected abstract StorageStrategy getStorageStrategy(); @@ -24,7 +23,6 @@ public abstract class AbstractStorageStrategyIT { StorageStrategyITConfig config = getConfig(); chathome = config.getChatHome(); - chatRoomFactory = config.getChatRoomFactory(); } protected void stop() @@ -40,7 +38,7 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); log.debug("Created chat-room {}", info); ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); @@ -72,7 +70,7 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); log.debug("Created chat-room {}", infoA); ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); @@ -81,7 +79,7 @@ public abstract class AbstractStorageStrategyIT Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); log.debug("Created chat-room {}", infoB); ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); @@ -117,6 +115,5 @@ public abstract class AbstractStorageStrategyIT interface StorageStrategyITConfig { ChatHome getChatHome(); - ChatRoomFactory getChatRoomFactory(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java index 2370cbe1..e2ffd3a5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -18,13 +18,18 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest { @Bean ShardedChatHome chatHome( - InMemoryChatHomeService chatHomeService) + StorageStrategy storageStrategy, + Clock clock) { SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS]; IntStream .of(ownedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); + .forEach(shard -> chatHomes[shard] = new SimpleChatHome( + shard, + storageStrategy.read(), + clock, + bufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); @@ -32,30 +37,31 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest } @Bean - InMemoryChatHomeService chatHomeService( - StorageStrategy storageStrategy) - { - return new InMemoryChatHomeService( - NUM_SHARDS, - ownedShards(), - storageStrategy.read()); - } - - @Bean - public FilesStorageStrategy storageStrategy() + public FilesStorageStrategy storageStrategy(Clock clock) { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - Clock.systemDefaultZone(), - 8, + clock, + bufferSize(), new KafkaLikeShardingStrategy(NUM_SHARDS), messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + int[] ownedShards() { return new int[] { OWNED_SHARD }; } + + int bufferSize() + { + return 8; + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java index 761e700c..190d0f24 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java @@ -17,30 +17,37 @@ public class SimpleChatHomeTest extends ChatHomeTest static class Configuration { @Bean - SimpleChatHome chatHome(InMemoryChatHomeService chatHomeService) + SimpleChatHome chatHome( + StorageStrategy storageStrategy, + Clock clock) { - return new SimpleChatHome(chatHomeService); + return new SimpleChatHome( + storageStrategy.read(), + clock, + bufferSize()); } @Bean - InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy) - { - return new InMemoryChatHomeService( - 1, - new int[] { 0 }, - storageStrategy.read()); - } - - @Bean - public FilesStorageStrategy storageStrategy() + public FilesStorageStrategy storageStrategy(Clock clock) { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - Clock.systemDefaultZone(), - 8, + clock, + bufferSize(), chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int bufferSize() + { + return 8; + } } } -- 2.20.1