From: Kai Moritz Date: Wed, 6 Sep 2023 21:41:27 +0000 (+0200) Subject: refactor: `ChatHome` is a service - Moved classes X-Git-Tag: rebase--2024-02-20--15-07~64 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f1cfd4e843c1da682a6885b1ec02595e34e0e809;p=demos%2Fkafka%2Fchat refactor: `ChatHome` is a service - Moved classes --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java deleted file mode 100644 index 2ff59cb3..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -public interface ChatHome -{ - Mono createChatRoom(UUID id, String name); - - Mono getChatRoomInfo(UUID id); - - Flux getChatRoomInfo(); - - Mono getChatRoomData(UUID id); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java new file mode 100644 index 00000000..2ff59cb3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public interface ChatHome +{ + Mono createChatRoom(UUID id, String name); + + Mono getChatRoomInfo(UUID id); + + Flux getChatRoomInfo(); + + Mono getChatRoomData(UUID id); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java deleted file mode 100644 index 4c8b2faa..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java +++ /dev/null @@ -1,106 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHome implements ChatHome -{ - private final SimpleChatHome[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHome( - SimpleChatHome[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono createChatRoom(UUID id, String name) - { - int shard = shardingStrategy.selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard].createChatRoom(id, name); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - int shard = selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard] - .getChatRoomInfo(id) - .onErrorMap(throwable -> throwable instanceof UnknownChatroomException - ? new UnknownChatroomException( - id, - shard, - ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) - : throwable); - } - - @Override - public Flux getChatRoomInfo() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); - } - - @Override - public Mono getChatRoomData(UUID id) - { - int shard = selectShard(id); - return chatHomes[shard] == null - ? Mono.error(new ShardNotOwnedException(shard)) - : chatHomes[shard] - .getChatRoomData(id) - .onErrorMap(throwable -> throwable instanceof UnknownChatroomException - ? new UnknownChatroomException( - id, - shard, - ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) - : throwable); - } - - public Flux getChatRoomData() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRoomData()); - } - - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java new file mode 100644 index 00000000..4c8b2faa --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java @@ -0,0 +1,106 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + + +@Slf4j +public class ShardedChatHome implements ChatHome +{ + private final SimpleChatHome[] chatHomes; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHome( + SimpleChatHome[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } + + + @Override + public Mono createChatRoom(UUID id, String name) + { + int shard = shardingStrategy.selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard].createChatRoom(id, name); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomInfo(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + @Override + public Flux getChatRoomInfo() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomData(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + public Flux getChatRoomData() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomData()); + } + + + + private int selectShard(UUID chatroomId) + { + return shardingStrategy.selectShard(chatroomId); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java deleted file mode 100644 index 868c01e8..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ /dev/null @@ -1,122 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.*; - - -@Slf4j -public class SimpleChatHome implements ChatHome -{ - private final Integer shard; - private final Map chatRoomInfo; - private final Map chatRoomData; - private final Clock clock; - private final int bufferSize; - - - - public SimpleChatHome( - StorageStrategy storageStrategy, - Clock clock, - int bufferSize) - { - this( - null, - storageStrategy, - clock, - bufferSize); - } - - public SimpleChatHome( - Integer shard, - StorageStrategy storageStrategy, - Clock clock, - int bufferSize) - { - log.info("Created SimpleChatHome for shard {}", shard); -; - this.shard = shard; - this.chatRoomInfo = new HashMap<>(); - this.chatRoomData = new HashMap<>(); - storageStrategy - .readChatRoomInfo() - .filter(info -> - { - if (shard == null || info.getShard() == shard) - { - return true; - } - else - { - log.info( - "SimpleChatHome for shard {} ignores not owned chat-room {}", - shard, - info); - return false; - } - }) - .toStream() - .forEach(info -> - { - UUID chatRoomId = info.getId(); - chatRoomInfo.put(chatRoomId, info); - Flux messageFlux = - storageStrategy.readChatRoomData(chatRoomId); - chatRoomData.put( - info.getId(), - new ChatRoomData( - clock, - new InMemoryChatRoomService(messageFlux), - bufferSize)); - }); - this.clock = clock; - this.bufferSize = bufferSize; - } - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); - this.chatRoomInfo.put(id, chatRoomInfo); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - this.chatRoomData.put(id, chatRoomData); - return Mono.just(chatRoomInfo); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - return Mono - .justOrEmpty(chatRoomInfo.get(id)) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRoomInfo() - { - return Flux.fromIterable(chatRoomInfo.values()); - } - - @Override - public Mono getChatRoomData(UUID id) - { - return Mono - .justOrEmpty(chatRoomData.get(id)) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - public Flux getChatRoomData() - { - return Flux.fromIterable(chatRoomData.values()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java new file mode 100644 index 00000000..868c01e8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java @@ -0,0 +1,122 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.util.*; + + +@Slf4j +public class SimpleChatHome implements ChatHome +{ + private final Integer shard; + private final Map chatRoomInfo; + private final Map chatRoomData; + private final Clock clock; + private final int bufferSize; + + + + public SimpleChatHome( + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) + { + this( + null, + storageStrategy, + clock, + bufferSize); + } + + public SimpleChatHome( + Integer shard, + StorageStrategy storageStrategy, + Clock clock, + int bufferSize) + { + log.info("Created SimpleChatHome for shard {}", shard); +; + this.shard = shard; + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> + { + if (shard == null || info.getShard() == shard) + { + return true; + } + else + { + log.info( + "SimpleChatHome for shard {} ignores not owned chat-room {}", + shard, + info); + return false; + } + }) + .toStream() + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); + this.clock = clock; + this.bufferSize = bufferSize; + } + + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + return Mono + .justOrEmpty(chatRoomInfo.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + @Override + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() + { + return Flux.fromIterable(chatRoomData.values()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java deleted file mode 100644 index 86b32707..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ /dev/null @@ -1,72 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoomData; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.utils.Utils; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatHome implements ChatHome -{ - private final int numPartitions; - private final ChatRoomChannel chatRoomChannel; - - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); - } - - @Override - public Mono getChatRoomInfo(UUID id) - { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomInfo(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); - } - - @Override - public Flux getChatRoomInfo() - { - return chatRoomChannel.getChatRoomInfo(); - } - - @Override - public Mono getChatRoomData(UUID id) - { - int shard = selectShard(id); - return chatRoomChannel - .getChatRoomData(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - chatRoomChannel.getOwnedShards()))); - } - - public Flux getChatRoomData() - { - return chatRoomChannel.getChatRoomData(); - } - - int selectShard(UUID chatRoomId) - { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java new file mode 100644 index 00000000..86b32707 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -0,0 +1,72 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.utils.Utils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@RequiredArgsConstructor +@Slf4j +public class KafkaChatHome implements ChatHome +{ + private final int numPartitions; + private final ChatRoomChannel chatRoomChannel; + + + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-command for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel + .getChatRoomInfo(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + @Override + public Flux getChatRoomInfo() + { + return chatRoomChannel.getChatRoomInfo(); + } + + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel + .getChatRoomData(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + public Flux getChatRoomData() + { + return chatRoomChannel.getChatRoomData(); + } + + int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java new file mode 100644 index 00000000..62826432 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.domain; + +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +@ExtendWith(SpringExtension.class) +public abstract class ChatHomeTest +{ + @Autowired + ChatHome chatHome; + + + @Test + @DisplayName("Assert chatroom is delivered, if it exists") + void testGetExistingChatroom() + { + // Given + UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + + // When + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) + .log("testGetExistingChatroom") + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)); + + // Then + assertThat(mono).emitsCount(1); + } + + @Test + @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") + void testGetNonExistentChatroom() + { + // Given + UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17"); + + // When + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) + .log("testGetNonExistentChatroom") + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)); + + // Then + assertThat(mono).sendsError(e -> + { + assertThat(e).isInstanceOf(UnknownChatroomException.class); + UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e; + assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId); + }); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java new file mode 100644 index 00000000..65a67b99 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java @@ -0,0 +1,46 @@ +package de.juplo.kafka.chat.backend.domain; + +import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.UUID; + +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public abstract class ChatHomeWithShardsTest extends ChatHomeTest +{ + public static final int NUM_SHARDS = 10; + public static final int OWNED_SHARD = 2; + public static final int NOT_OWNED_SHARD = 0; + + + @Test + @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned") + void testGetChatroomForNotOwnedShard() + { + // Given + UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19"); + + // When + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) + .log("testGetChatroomForNotOwnedShard") + .retryWhen(Retry + .backoff(5, Duration.ofSeconds(1)) + .filter(throwable -> throwable instanceof LoadInProgressException)); + + // Then + assertThat(mono).sendsError(e -> + { + assertThat(e).isInstanceOf(ShardNotOwnedException.class); + ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e; + assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD); + }); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java deleted file mode 100644 index 62826432..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java +++ /dev/null @@ -1,68 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.time.Duration; -import java.util.UUID; - -import static pl.rzrz.assertj.reactor.Assertions.assertThat; - - -@ExtendWith(SpringExtension.class) -public abstract class ChatHomeTest -{ - @Autowired - ChatHome chatHome; - - - @Test - @DisplayName("Assert chatroom is delivered, if it exists") - void testGetExistingChatroom() - { - // Given - UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - - // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoomData(chatRoomId)) - .log("testGetExistingChatroom") - .retryWhen(Retry - .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); - - // Then - assertThat(mono).emitsCount(1); - } - - @Test - @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") - void testGetNonExistentChatroom() - { - // Given - UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17"); - - // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoomData(chatRoomId)) - .log("testGetNonExistentChatroom") - .retryWhen(Retry - .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); - - // Then - assertThat(mono).sendsError(e -> - { - assertThat(e).isInstanceOf(UnknownChatroomException.class); - UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e; - assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId); - }); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java deleted file mode 100644 index 65a67b99..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; - -import java.time.Duration; -import java.util.UUID; - -import static pl.rzrz.assertj.reactor.Assertions.assertThat; - - -public abstract class ChatHomeWithShardsTest extends ChatHomeTest -{ - public static final int NUM_SHARDS = 10; - public static final int OWNED_SHARD = 2; - public static final int NOT_OWNED_SHARD = 0; - - - @Test - @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned") - void testGetChatroomForNotOwnedShard() - { - // Given - UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19"); - - // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoomData(chatRoomId)) - .log("testGetChatroomForNotOwnedShard") - .retryWhen(Retry - .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); - - // Then - assertThat(mono).sendsError(e -> - { - assertThat(e).isInstanceOf(ShardNotOwnedException.class); - ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e; - assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD); - }); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java new file mode 100644 index 00000000..8d9036f0 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java @@ -0,0 +1,65 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; +import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; +import java.util.stream.IntStream; + +public class ShardedChatHomeTest extends ChatHomeWithShardsTest +{ + @TestConfiguration + static class Configuration + { + @Bean + ShardedChatHome chatHome( + StorageStrategy storageStrategy, + Clock clock) + { + SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS]; + + IntStream + .of(ownedShards()) + .forEach(shard -> chatHomes[shard] = new SimpleChatHome( + shard, + storageStrategy, + clock, + bufferSize())); + + ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); + + return new ShardedChatHome(chatHomes, strategy); + } + + @Bean + public FilesStorageStrategy storageStrategy(Clock clock) + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + new KafkaLikeShardingStrategy(NUM_SHARDS), + new ObjectMapper()); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int[] ownedShards() + { + return new int[] { OWNED_SHARD }; + } + + int bufferSize() + { + return 8; + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java deleted file mode 100644 index 8d9036f0..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; - -import java.nio.file.Paths; -import java.time.Clock; -import java.util.stream.IntStream; - -public class ShardedChatHomeTest extends ChatHomeWithShardsTest -{ - @TestConfiguration - static class Configuration - { - @Bean - ShardedChatHome chatHome( - StorageStrategy storageStrategy, - Clock clock) - { - SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS]; - - IntStream - .of(ownedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHome( - shard, - storageStrategy, - clock, - bufferSize())); - - ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); - - return new ShardedChatHome(chatHomes, strategy); - } - - @Bean - public FilesStorageStrategy storageStrategy(Clock clock) - { - return new FilesStorageStrategy( - Paths.get("target", "test-classes", "data", "files"), - new KafkaLikeShardingStrategy(NUM_SHARDS), - new ObjectMapper()); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - - int[] ownedShards() - { - return new int[] { OWNED_SHARD }; - } - - int bufferSize() - { - return 8; - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java new file mode 100644 index 00000000..8be31731 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java @@ -0,0 +1,50 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.domain.ChatHomeTest; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +import java.nio.file.Paths; +import java.time.Clock; + + +public class SimpleChatHomeTest extends ChatHomeTest +{ + @TestConfiguration + static class Configuration + { + @Bean + SimpleChatHome chatHome( + StorageStrategy storageStrategy, + Clock clock) + { + return new SimpleChatHome( + storageStrategy, + clock, + bufferSize()); + } + + @Bean + public FilesStorageStrategy storageStrategy(Clock clock) + { + return new FilesStorageStrategy( + Paths.get("target", "test-classes", "data", "files"), + chatRoomId -> 0, + new ObjectMapper()); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + int bufferSize() + { + return 8; + } + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java deleted file mode 100644 index 8be31731..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.domain.ChatHomeTest; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; - -import java.nio.file.Paths; -import java.time.Clock; - - -public class SimpleChatHomeTest extends ChatHomeTest -{ - @TestConfiguration - static class Configuration - { - @Bean - SimpleChatHome chatHome( - StorageStrategy storageStrategy, - Clock clock) - { - return new SimpleChatHome( - storageStrategy, - clock, - bufferSize()); - } - - @Bean - public FilesStorageStrategy storageStrategy(Clock clock) - { - return new FilesStorageStrategy( - Paths.get("target", "test-classes", "data", "files"), - chatRoomId -> 0, - new ObjectMapper()); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - - int bufferSize() - { - return 8; - } - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java new file mode 100644 index 00000000..d758a22d --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java @@ -0,0 +1,113 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.time.Clock; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS; +import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC; + + +@SpringBootTest( + classes = { + KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class, + KafkaServicesConfiguration.class, + KafkaAutoConfiguration.class, + TaskExecutionAutoConfiguration.class, + }, + properties = { + "chat.backend.services=kafka", + "chat.backend.kafka.client-id-PREFIX=TEST", + "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, + "chat.backend.kafka.num-partitions=" + NUM_SHARDS, +}) +@EmbeddedKafka(topics = { TOPIC }, partitions = 10) +@Slf4j +public class KafkaChatHomeTest extends ChatHomeWithShardsTest +{ + final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; + + static CompletableFuture CONSUMER_JOB; + + + @TestConfiguration + @EnableConfigurationProperties(ChatBackendProperties.class) + static class KafkaChatHomeTestConfiguration + { + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + } + + + @BeforeAll + public static void sendAndLoadStoredData( + @Autowired KafkaTemplate messageTemplate, + @Autowired Consumer chatRoomChannelConsumer, + @Autowired ThreadPoolTaskExecutor taskExecutor, + @Autowired ChatRoomChannel chatRoomChannel) + { + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); + + List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); + chatRoomChannelConsumer.assign(assignedPartitions); + chatRoomChannel.onPartitionsAssigned(assignedPartitions); + CONSUMER_JOB = taskExecutor + .submitCompletable(chatRoomChannel) + .exceptionally(e -> + { + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + return null; + }); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + SendResult result = kafkaTemplate.send(record).join(); + log.info( + "Sent {}={} to {}", + key, + value, + new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); + } + + @AfterAll + static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) + { + log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + CONSUMER_JOB.join(); + log.info("Joined the consumer of the ChatRoomChannel"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java deleted file mode 100644 index d758a22d..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java +++ /dev/null @@ -1,113 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; -import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.time.Clock; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS; -import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC; - - -@SpringBootTest( - classes = { - KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class, - KafkaServicesConfiguration.class, - KafkaAutoConfiguration.class, - TaskExecutionAutoConfiguration.class, - }, - properties = { - "chat.backend.services=kafka", - "chat.backend.kafka.client-id-PREFIX=TEST", - "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "chat.backend.kafka.chatroom-channel-topic=" + TOPIC, - "chat.backend.kafka.num-partitions=" + NUM_SHARDS, -}) -@EmbeddedKafka(topics = { TOPIC }, partitions = 10) -@Slf4j -public class KafkaChatHomeTest extends ChatHomeWithShardsTest -{ - final static String TOPIC = "KAFKA_CHAT_HOME_TEST"; - - static CompletableFuture CONSUMER_JOB; - - - @TestConfiguration - @EnableConfigurationProperties(ChatBackendProperties.class) - static class KafkaChatHomeTestConfiguration - { - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - } - - - @BeforeAll - public static void sendAndLoadStoredData( - @Autowired KafkaTemplate messageTemplate, - @Autowired Consumer chatRoomChannelConsumer, - @Autowired ThreadPoolTaskExecutor taskExecutor, - @Autowired ChatRoomChannel chatRoomChannel) - { - send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); - send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - - List assignedPartitions = List.of(new TopicPartition(TOPIC, 2)); - chatRoomChannelConsumer.assign(assignedPartitions); - chatRoomChannel.onPartitionsAssigned(assignedPartitions); - CONSUMER_JOB = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - } - - static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) - { - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", typeId.getBytes()); - SendResult result = kafkaTemplate.send(record).join(); - log.info( - "Sent {}={} to {}", - key, - value, - new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); - } - - @AfterAll - static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer) - { - log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - CONSUMER_JOB.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } -}