From: Kai Moritz Date: Fri, 24 Feb 2023 08:47:35 +0000 (+0100) Subject: refactor: Pushed sharding one layer down in the architecture X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e1ad66ecb1dc386bb357e364a05b071ec45920e1;p=demos%2Fkafka%2Fchat refactor: Pushed sharding one layer down in the architecture - The packages `domain` and `api` does not have to know anything about sharding. - This is a preparation for the implementation of a persistence implementation, that is based on Kafka. - Also simplifies the configuration in `InMemoryServiceConfiguration`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 8e1ff9e5..c61f8488 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome[] chatHomes; + ChatHome chatHome; @Autowired StorageStrategy storageStrategy; @@ -32,8 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.write(chatHomes[shard].getChatRooms()); + storageStrategy.write(chatHome.getChatRooms()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..557cf75f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,14 +1,29 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.UUID; +import java.util.*; -public interface ChatHome +@RequiredArgsConstructor +@Slf4j +public class ChatHome { - Mono getChatRoom(UUID id); + private final ChatHomeService service; - Flux getChatRooms(); + + public Mono getChatRoom(UUID id) + { + return service + .getChatRoom(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRooms() + { + return service.getChatRooms(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 19ff4aa4..7876130d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,6 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono getChatRoom(int shard, UUID id); - Flux getChatRooms(int shard); + Mono getChatRoom(UUID id); + Flux getChatRooms(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java new file mode 100644 index 00000000..d467eabb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.stream.Collectors; + + +public class ShardNotOwnedException extends IllegalStateException +{ + @Getter + private final ChatHomeService chatHomeService; + @Getter + private final ChatRoomInfo chatRoomInfo; + @Getter + private final int shard; + @Getter + private final int[] ownedShards; + + + public ShardNotOwnedException( + ChatHomeService chatHomeService, + ChatRoomInfo chatRoomInfo, + int shard, + Collection ownedShards) + { + this( + chatHomeService, + chatRoomInfo, + shard, + ShardNotOwnedException.toArray(ownedShards)); + } + + public ShardNotOwnedException( + ChatHomeService chatHomeService, + ChatRoomInfo chatRoomInfo, + int shard, + int[] ownedShards) + { + super( + chatHomeService + + " does not own the shard " + + shard + + " for ChatRoom " + + chatRoomInfo + + " owned shards: " + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(", "))); + this.chatHomeService = chatHomeService; + this.chatRoomInfo = chatRoomInfo; + this.shard = shard; + this.ownedShards = ownedShards; + } + + + private static int[] toArray(Collection collection) + { + int[] array = new int[collection.size()]; + Iterator iterator = collection.iterator(); + for (int i = 0; iterator.hasNext(); i++) + array[i] = iterator.next(); + return array; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java deleted file mode 100644 index 4b8c7f16..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ /dev/null @@ -1,59 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHome implements ChatHome -{ - private final ChatHome[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHome( - ChatHome[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return chatHomes[selectShard(id)].getChatRoom(id); - } - - @Override - public Flux getChatRooms() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); - } - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java deleted file mode 100644 index 11542edd..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@Slf4j -public class SimpleChatHome implements ChatHome -{ - private final ChatHomeService service; - private final int shard; - - - public SimpleChatHome(ChatHomeService service, int shard) - { - log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; - this.shard = shard; - } - - public SimpleChatHome(ChatHomeService service) - { - this(service, 0); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRooms() - { - return service.getChatRooms(shard); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java index 1f70f110..714c2207 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java @@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; +import java.util.Arrays; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; -public class UnknownChatroomException extends RuntimeException +public class UnknownChatroomException extends IllegalStateException { @Getter private final UUID chatroomId; + @Getter + private final Optional shard; + @Getter + private final Optional ownedShards; public UnknownChatroomException(UUID chatroomId) { super("Chatroom does not exist: " + chatroomId); this.chatroomId = chatroomId; + this.shard = Optional.empty(); + this.ownedShards = Optional.empty(); + } + + public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards) + { + super( + "Chatroom does not exist (here): " + + chatroomId + + " shard=" + + shard + + ", owned=" + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(","))); + this.chatroomId = chatroomId; + this.shard = Optional.of(shard); + this.ownedShards = Optional.of(ownedShards); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 8f262a0b..25a9bcf6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,31 +12,39 @@ import java.util.*; public class InMemoryChatHomeService implements ChatHomeService { private final Map[] chatrooms; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; public InMemoryChatHomeService( int numShards, int[] ownedShards, + ShardingStrategy shardingStrategy, Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); + this.chatrooms = new Map[numShards]; - Set owned = Arrays + + this.ownedShards = Arrays .stream(ownedShards) .collect( () -> new HashSet<>(), (set, i) -> set.add(i), (a, b) -> a.addAll(b)); + + this.shardingStrategy = shardingStrategy; + for (int shard = 0; shard < numShards; shard++) { - chatrooms[shard] = owned.contains(shard) + chatrooms[shard] = this.ownedShards.contains(shard) ? new HashMap<>() : null; } chatroomFlux .filter(chatRoom -> { - if (owned.contains(chatRoom.getShard())) + if (this.ownedShards.contains(chatRoom.getShard())) { return true; } @@ -51,20 +58,43 @@ public class InMemoryChatHomeService implements ChatHomeService .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } - public void putChatRoom(ChatRoom chatRoom) + void putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + UUID id = chatRoom.getId(); + int shard = shardingStrategy.selectShard(id); + if (!ownedShards.contains(shard)) + throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards); + chatrooms[shard].put(id, chatRoom); } @Override - public Mono getChatRoom(int shard, UUID id) + public Mono getChatRoom(UUID id) { - return Mono.justOrEmpty(chatrooms[shard].get(id)); + int shard = shardingStrategy.selectShard(id); + if (ownedShards.contains(shard)) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + else + { + int[] ownedShards = new int[this.ownedShards.size()]; + Iterator iterator = this.ownedShards.iterator(); + for (int i = 0; iterator.hasNext(); i++) + { + ownedShards[i] = iterator.next(); + } + return Mono.error(new UnknownChatroomException( + id, + shard, + ownedShards)); + } } @Override - public Flux getChatRooms(int shard) + public Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index de504485..cb1a0703 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -2,11 +2,9 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -24,38 +22,9 @@ import java.time.Clock; public class InMemoryServicesConfiguration { @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) - { - return new SimpleChatHome(chatHomeService); - } - - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") - ChatHome kafkalikeShardingChatHome( - ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) + ChatHome chatHome(InMemoryChatHomeService chatHomeService) { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - storageStrategy - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); - }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); + return new ChatHome(chatHomeService); } @Bean @@ -65,15 +34,31 @@ public class InMemoryServicesConfiguration { ShardingStrategyType sharding = properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); + + int numShards; + int[] ownedShards; + ShardingStrategy shardingStrategy; + + switch (sharding) + { + case none: + numShards = 1; + ownedShards = new int[] { 0 }; + shardingStrategy = id -> 0; + break; + case kafkalike: + numShards = properties.getInmemory().getNumShards(); + ownedShards = properties.getInmemory().getOwnedShards(); + shardingStrategy = new KafkaLikeShardingStrategy(numShards); + break; + default: + throw new IllegalArgumentException("Unknown sharding strategy: " + sharding); + } + return new InMemoryChatHomeService( numShards, ownedShards, + shardingStrategy, storageStrategy.read()); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index b72294d9..b80e655f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -39,7 +39,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -59,7 +59,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -80,7 +80,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -106,7 +106,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -129,7 +129,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -173,7 +173,7 @@ public class ChatBackendControllerTest 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); Message existingMessage = new Message( key, serialNumberExistingMessage, @@ -225,7 +225,7 @@ public class ChatBackendControllerTest 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) + when(chatHomeService.getChatRoom(any(UUID.class))) .thenReturn(Mono.just(chatRoom)); when(chatRoomService.getMessage(any(Message.MessageKey.class))) .thenReturn(Mono.empty()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java new file mode 100644 index 00000000..5d9dfafc --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -0,0 +1,56 @@ +package de.juplo.kafka.chat.backend.domain; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public class ChatHomeTest +{ + @Test + @DisplayName("Assert chatroom is delivered, if it exists") + void testGetExistingChatroom() + { + // Given + ChatHomeService chatHomeService = mock(ChatHomeService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + mock(ChatRoomService.class), + 8); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); + ChatHome chatHome = new ChatHome(chatHomeService); + + // When + Mono mono = chatHome.getChatRoom(chatRoom.getId()); + + // Then + assertThat(mono).emitsExactly(chatRoom); + } + + @Test + @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") + void testGetNonExistentChatroom() + { + // Given + ChatHomeService chatHomeService = mock(ChatHomeService.class); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + ChatHome chatHome = new ChatHome(chatHomeService); + + // When + Mono mono = chatHome.getChatRoom(UUID.randomUUID()); + + // Then + assertThat(mono).sendsError(); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java deleted file mode 100644 index 5b53607f..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static pl.rzrz.assertj.reactor.Assertions.assertThat; - - -public class SimpleChatHomeTest -{ - @Test - @DisplayName("Assert chatroom is delivered, if it exists") - void testGetExistingChatroom() - { - // Given - ChatHomeService chatHomeService = mock(ChatHomeService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - mock(ChatRoomService.class), - 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); - - // When - Mono mono = chatHome.getChatRoom(chatRoom.getId()); - - // Then - assertThat(mono).emitsExactly(chatRoom); - } - - @Test - @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") - void testGetNonExistentChatroom() - { - // Given - ChatHomeService chatHomeService = mock(ChatHomeService.class); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); - - // When - Mono mono = chatHome.getChatRoom(UUID.randomUUID()); - - // Then - assertThat(mono).sendsError(); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index dd76324c..a68445b6 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -24,6 +24,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( 1, new int[] { 0 }, + id -> 0, getStorageStrategy().read()); InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 3ce527eb..83e905b9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -23,7 +23,7 @@ public abstract class AbstractStorageStrategyIT protected void start() { StorageStrategyITConfig config = getConfig(); - chathome = new SimpleChatHome(config.getChatHomeService()); + chathome = new ChatHome(config.getChatHomeService()); chatRoomFactory = config.getChatRoomFactory(); }