From: Kai Moritz Date: Sat, 2 Sep 2023 17:16:26 +0000 (+0200) Subject: refactor: Simplified implementation - Removed interface `ChatRoomFactory` X-Git-Tag: rebase--2024-01-27--15-46~37 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a39837c0ddf444dd98b371eaf8226ad865543519;p=demos%2Fkafka%2Fchat refactor: Simplified implementation - Removed interface `ChatRoomFactory` * Moved method `ChatRoomFactory.createChatRoom(UUID, String)` to `ChatHome`. * Allowed `null`-values for `ChatRoom.shard`. * Moved logic from `InMemoryChatHomeService` into `SimpleChatHome` respective `ShardedChatHome` and removed obsolete class. * Adapted the configuration of the tests to the model changes: --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 339451a8..f41f45f6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; import org.springframework.http.codec.ServerSentEvent; @@ -18,7 +17,6 @@ import java.util.UUID; public class ChatBackendController { private final ChatHome chatHome; - private final ChatRoomFactory factory; private final StorageStrategy storageStrategy; @@ -26,7 +24,7 @@ public class ChatBackendController public Mono create(@RequestBody String name) { UUID chatRoomId = UUID.randomUUID(); - return factory + return chatHome .createChatRoom(chatRoomId, name) .map(ChatRoomInfoTo::from); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java index 212fb8d5..18f37117 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java @@ -10,7 +10,7 @@ public class ChatRoomInfoTo { private UUID id; private String name; - private int shard; + private Integer shard; public static ChatRoomInfoTo from(ChatRoomInfo info) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..e4d92dbb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,6 +8,8 @@ import java.util.UUID; public interface ChatHome { + Mono createChatRoom(UUID id, String name); + Mono getChatRoom(UUID id); Flux getChatRooms(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index b9463095..c66b887d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -26,7 +26,7 @@ public class ChatRoom extends ChatRoomInfo public ChatRoom( UUID id, String name, - int shard, + Integer shard, Clock clock, ChatRoomService service, int bufferSize) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java deleted file mode 100644 index 603795d9..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -public interface ChatRoomFactory -{ - Mono createChatRoom(UUID id, String name); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 6d88be95..33c522d1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -18,5 +18,5 @@ public class ChatRoomInfo @Getter private final String name; @Getter - private final int shard; + private final Integer shard; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java deleted file mode 100644 index 29f13127..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ /dev/null @@ -1,76 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; -import java.util.stream.IntStream; - - -@Slf4j -public class InMemoryChatHomeService -{ - private final Map[] chatrooms; - - - public InMemoryChatHomeService( - int numShards, - int[] ownedShards, - Flux chatroomFlux) - { - log.debug("Creating InMemoryChatHomeService"); - this.chatrooms = new Map[numShards]; - Set owned = Arrays - .stream(ownedShards) - .collect( - () -> new HashSet<>(), - (set, i) -> set.add(i), - (a, b) -> a.addAll(b)); - for (int shard = 0; shard < numShards; shard++) - { - chatrooms[shard] = owned.contains(shard) - ? new HashMap<>() - : null; - } - chatroomFlux - .filter(chatRoom -> - { - if (owned.contains(chatRoom.getShard())) - { - return true; - } - else - { - log.info("Ignoring not owned chat-room {}", chatRoom); - return false; - } - }) - .toStream() - .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); - } - - public void putChatRoom(ChatRoom chatRoom) - { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - } - - public Mono getChatRoom(int shard, UUID id) - { - return Mono.justOrEmpty(chatrooms[shard].get(id)); - } - - public int[] getOwnedShards() - { - return IntStream - .range(0, chatrooms.length) - .filter(i -> chatrooms[i] != null) - .toArray(); - } - - public Flux getChatRooms(int shard) - { - return Flux.fromStream(chatrooms[shard].values().stream()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java deleted file mode 100644 index 2bde2361..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class InMemoryChatRoomFactory implements ChatRoomFactory -{ - private final InMemoryChatHomeService chatHomeService; - private final ShardingStrategy shardingStrategy; - private final Clock clock; - private final int bufferSize; - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - int shard = shardingStrategy.selectShard(id); - ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatHomeService.putChatRoom(chatRoom); - return Mono.just(chatRoom); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 375ed721..106c7369 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -26,9 +25,15 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) + ChatHome noneShardingChatHome( + ChatBackendProperties properties, + StorageStrategy storageStrategy, + Clock clock) { - return new SimpleChatHome(chatHomeService); + return new SimpleChatHome( + storageStrategy.read(), + clock, + properties.getChatroomBufferSize()); } @Bean @@ -38,50 +43,22 @@ public class InMemoryServicesConfiguration havingValue = "kafkalike") ChatHome kafkalikeShardingChatHome( ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService) + StorageStrategy storageStrategy, + Clock clock) { int numShards = properties.getInmemory().getNumShards(); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); + .forEach(shard -> chatHomes[shard] = new SimpleChatHome( + shard, + storageStrategy.read(), + clock, + properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHome(chatHomes, strategy); } - @Bean - InMemoryChatHomeService chatHomeService( - ChatBackendProperties properties, - StorageStrategy storageStrategy) - { - ShardingStrategyType sharding = - properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); - return new InMemoryChatHomeService( - numShards, - ownedShards, - storageStrategy.read()); - } - - @Bean - InMemoryChatRoomFactory chatRoomFactory( - InMemoryChatHomeService service, - ShardingStrategy strategy, - Clock clock, - ChatBackendProperties properties) - { - return new InMemoryChatRoomFactory( - service, - strategy, - clock, - properties.getChatroomBufferSize()); - } - @ConditionalOnProperty( prefix = "chat.backend.inmemory", name = "sharding-strategy", diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java index ac7a9808..c6aff1e3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java @@ -1,8 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,13 +38,29 @@ public class ShardedChatHome implements ChatHome } + @Override + public Mono createChatRoom(UUID id, String name) + { + int shard = shardingStrategy.selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard].createChatRoom(id, name); + } + @Override public Mono getChatRoom(UUID id) { int shard = selectShard(id); return chatHomes[shard] == null ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard].getChatRoom(id); + : chatHomes[shard] + .getChatRoom(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index f99bc9d8..c2d25b20 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,49 +1,86 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Clock; import java.util.*; @Slf4j public class SimpleChatHome implements ChatHome { - private final InMemoryChatHomeService service; - private final int shard; + private final Integer shard; + private final Map chatRooms; + private final Clock clock; + private final int bufferSize; - public SimpleChatHome(InMemoryChatHomeService service, int shard) + + public SimpleChatHome( + Flux chatroomFlux, + Clock clock, + int bufferSize) + { + this(null, chatroomFlux, clock, bufferSize); + } + + public SimpleChatHome( + Integer shard, + Flux chatroomFlux, + Clock clock, + int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; +; this.shard = shard; + this.chatRooms = new HashMap<>(); + chatroomFlux + .filter(chatRoom -> + { + if (shard == null || chatRoom.getShard() == shard) + { + return true; + } + else + { + log.info( + "SimpleChatHome for shard {} ignores not owned chat-room {}", + shard, + chatRoom); + return false; + } + }) + .toStream() + .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + this.clock = clock; + this.bufferSize = bufferSize; } - public SimpleChatHome(InMemoryChatHomeService service) + + @Override + public Mono createChatRoom(UUID id, String name) { - this(service, 0); + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatRooms.put(id, chatRoom); + return Mono.just(chatRoom); } - @Override public Mono getChatRoom(UUID id) { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - service.getOwnedShards()))); + return Mono + .justOrEmpty(chatRooms.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRooms() { - return service.getChatRooms(shard); + return Flux.fromIterable(chatRooms.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 07fb8858..06228396 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,14 @@ public class KafkaChatHome implements ChatHome private final ChatRoomChannel chatRoomChannel; + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-command for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); + } + @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java deleted file mode 100644 index 6a1dc78a..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatRoomFactory implements ChatRoomFactory -{ - private final ChatRoomChannel chatRoomChannel; - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 1cd41b53..df4faed8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -43,12 +43,6 @@ public class KafkaServicesConfiguration chatRoomChannel); } - @Bean - KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) - { - return new KafkaChatRoomFactory(chatRoomChannel); - } - @Bean ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index bff02a36..62dc08ad 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.SimpleChatHome; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,30 +19,18 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI { return new StorageStrategyITConfig() { - InMemoryChatHomeService inMemoryChatHomeService = new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); + int bufferSize = 8; - SimpleChatHome simpleChatHome = new SimpleChatHome(inMemoryChatHomeService); - - InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( - inMemoryChatHomeService, - chatRoomId -> 0, + SimpleChatHome simpleChatHome = new SimpleChatHome( + getStorageStrategy().read(), clock, - 8); + bufferSize); @Override public ChatHome getChatHome() { return simpleChatHome; } - - @Override - public ChatRoomFactory getChatRoomFactory() - { - return chatRoomFactory; - } }; } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 58acb192..2a42a128 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -14,7 +14,6 @@ import static pl.rzrz.assertj.reactor.Assertions.*; public abstract class AbstractStorageStrategyIT { protected ChatHome chathome; - protected ChatRoomFactory chatRoomFactory; protected abstract StorageStrategy getStorageStrategy(); @@ -24,7 +23,6 @@ public abstract class AbstractStorageStrategyIT { StorageStrategyITConfig config = getConfig(); chathome = config.getChatHome(); - chatRoomFactory = config.getChatRoomFactory(); } protected void stop() @@ -40,7 +38,7 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); log.debug("Created chat-room {}", info); ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); @@ -72,7 +70,7 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); log.debug("Created chat-room {}", infoA); ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); @@ -81,7 +79,7 @@ public abstract class AbstractStorageStrategyIT Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); log.debug("Created chat-room {}", infoB); ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); @@ -117,6 +115,5 @@ public abstract class AbstractStorageStrategyIT interface StorageStrategyITConfig { ChatHome getChatHome(); - ChatRoomFactory getChatRoomFactory(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java index 2370cbe1..e2ffd3a5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -18,13 +18,18 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest { @Bean ShardedChatHome chatHome( - InMemoryChatHomeService chatHomeService) + StorageStrategy storageStrategy, + Clock clock) { SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS]; IntStream .of(ownedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); + .forEach(shard -> chatHomes[shard] = new SimpleChatHome( + shard, + storageStrategy.read(), + clock, + bufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); @@ -32,30 +37,31 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest } @Bean - InMemoryChatHomeService chatHomeService( - StorageStrategy storageStrategy) - { - return new InMemoryChatHomeService( - NUM_SHARDS, - ownedShards(), - storageStrategy.read()); - } - - @Bean - public FilesStorageStrategy storageStrategy() + public FilesStorageStrategy storageStrategy(Clock clock) { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - Clock.systemDefaultZone(), - 8, + clock, + bufferSize(), new KafkaLikeShardingStrategy(NUM_SHARDS), messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + int[] ownedShards() { return new int[] { OWNED_SHARD }; } + + int bufferSize() + { + return 8; + } } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java index 761e700c..190d0f24 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java @@ -17,30 +17,37 @@ public class SimpleChatHomeTest extends ChatHomeTest static class Configuration { @Bean - SimpleChatHome chatHome(InMemoryChatHomeService chatHomeService) + SimpleChatHome chatHome( + StorageStrategy storageStrategy, + Clock clock) { - return new SimpleChatHome(chatHomeService); + return new SimpleChatHome( + storageStrategy.read(), + clock, + bufferSize()); } @Bean - InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy) - { - return new InMemoryChatHomeService( - 1, - new int[] { 0 }, - storageStrategy.read()); - } - - @Bean - public FilesStorageStrategy storageStrategy() + public FilesStorageStrategy storageStrategy(Clock clock) { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - Clock.systemDefaultZone(), - 8, + clock, + bufferSize(), chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int bufferSize() + { + return 8; + } } }