+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatHome
-{
- Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
-
- Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
-
- Flux<ChatRoomInfo> getChatRoomInfo();
-
- Mono<ChatRoomData> getChatRoomData(UUID id);
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public interface ChatHome
+{
+ Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
+
+ Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
+
+ Flux<ChatRoomInfo> getChatRoomInfo();
+
+ Mono<ChatRoomData> getChatRoomData(UUID id);
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
- private final SimpleChatHome[] chatHomes;
- private final Set<Integer> ownedShards;
- private final ShardingStrategy shardingStrategy;
-
-
- public ShardedChatHome(
- SimpleChatHome[] chatHomes,
- ShardingStrategy shardingStrategy)
- {
- this.chatHomes = chatHomes;
- this.shardingStrategy = shardingStrategy;
- this.ownedShards = new HashSet<>();
- for (int shard = 0; shard < chatHomes.length; shard++)
- if(chatHomes[shard] != null)
- this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- }
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- int shard = shardingStrategy.selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard].createChatRoom(id, name);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- int shard = selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard]
- .getChatRoomInfo(id)
- .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
- ? new UnknownChatroomException(
- id,
- shard,
- ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
- : throwable);
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- int shard = selectShard(id);
- return chatHomes[shard] == null
- ? Mono.error(new ShardNotOwnedException(shard))
- : chatHomes[shard]
- .getChatRoomData(id)
- .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
- ? new UnknownChatroomException(
- id,
- shard,
- ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
- : throwable);
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRoomData());
- }
-
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class ShardedChatHome implements ChatHome
+{
+ private final SimpleChatHome[] chatHomes;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
+
+
+ public ShardedChatHome(
+ SimpleChatHome[] chatHomes,
+ ShardingStrategy shardingStrategy)
+ {
+ this.chatHomes = chatHomes;
+ this.shardingStrategy = shardingStrategy;
+ this.ownedShards = new HashSet<>();
+ for (int shard = 0; shard < chatHomes.length; shard++)
+ if(chatHomes[shard] != null)
+ this.ownedShards.add(shard);
+ log.info(
+ "Created ShardedChatHome for shards: {}",
+ ownedShards
+ .stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(", ")));
+ }
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard].createChatRoom(id, name);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomInfo(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomData(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomData());
+ }
+
+
+
+ private int selectShard(UUID chatroomId)
+ {
+ return shardingStrategy.selectShard(chatroomId);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
- private final Integer shard;
- private final Map<UUID, ChatRoomInfo> chatRoomInfo;
- private final Map<UUID, ChatRoomData> chatRoomData;
- private final Clock clock;
- private final int bufferSize;
-
-
-
- public SimpleChatHome(
- StorageStrategy storageStrategy,
- Clock clock,
- int bufferSize)
- {
- this(
- null,
- storageStrategy,
- clock,
- bufferSize);
- }
-
- public SimpleChatHome(
- Integer shard,
- StorageStrategy storageStrategy,
- Clock clock,
- int bufferSize)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
-;
- this.shard = shard;
- this.chatRoomInfo = new HashMap<>();
- this.chatRoomData = new HashMap<>();
- storageStrategy
- .readChatRoomInfo()
- .filter(info ->
- {
- if (shard == null || info.getShard() == shard)
- {
- return true;
- }
- else
- {
- log.info(
- "SimpleChatHome for shard {} ignores not owned chat-room {}",
- shard,
- info);
- return false;
- }
- })
- .toStream()
- .forEach(info ->
- {
- UUID chatRoomId = info.getId();
- chatRoomInfo.put(chatRoomId, info);
- Flux<Message> messageFlux =
- storageStrategy.readChatRoomData(chatRoomId);
- chatRoomData.put(
- info.getId(),
- new ChatRoomData(
- clock,
- new InMemoryChatRoomService(messageFlux),
- bufferSize));
- });
- this.clock = clock;
- this.bufferSize = bufferSize;
- }
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
- this.chatRoomInfo.put(id, chatRoomInfo);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
- this.chatRoomData.put(id, chatRoomData);
- return Mono.just(chatRoomInfo);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- return Mono
- .justOrEmpty(chatRoomInfo.get(id))
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux.fromIterable(chatRoomInfo.values());
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- return Mono
- .justOrEmpty(chatRoomData.get(id))
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux.fromIterable(chatRoomData.values());
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.*;
+
+
+@Slf4j
+public class SimpleChatHome implements ChatHome
+{
+ private final Integer shard;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
+ private final Clock clock;
+ private final int bufferSize;
+
+
+
+ public SimpleChatHome(
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
+ {
+ this(
+ null,
+ storageStrategy,
+ clock,
+ bufferSize);
+ }
+
+ public SimpleChatHome(
+ Integer shard,
+ StorageStrategy storageStrategy,
+ Clock clock,
+ int bufferSize)
+ {
+ log.info("Created SimpleChatHome for shard {}", shard);
+;
+ this.shard = shard;
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ storageStrategy
+ .readChatRoomInfo()
+ .filter(info ->
+ {
+ if (shard == null || info.getShard() == shard)
+ {
+ return true;
+ }
+ else
+ {
+ log.info(
+ "SimpleChatHome for shard {} ignores not owned chat-room {}",
+ shard,
+ info);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(info ->
+ {
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(
+ clock,
+ new InMemoryChatRoomService(messageFlux),
+ bufferSize));
+ });
+ this.clock = clock;
+ this.bufferSize = bufferSize;
+ }
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ this.chatRoomInfo.put(id, chatRoomInfo);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomInfo.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomData.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux.fromIterable(chatRoomData.values());
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.utils.Utils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatHome implements ChatHome
-{
- private final int numPartitions;
- private final ChatRoomChannel chatRoomChannel;
-
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
- }
-
- @Override
- public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
- {
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomInfo(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
- }
-
- @Override
- public Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return chatRoomChannel.getChatRoomInfo();
- }
-
- @Override
- public Mono<ChatRoomData> getChatRoomData(UUID id)
- {
- int shard = selectShard(id);
- return chatRoomChannel
- .getChatRoomData(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
- id,
- shard,
- chatRoomChannel.getOwnedShards())));
- }
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return chatRoomChannel.getChatRoomData();
- }
-
- int selectShard(UUID chatRoomId)
- {
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.utils.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+ private final int numPartitions;
+ private final ChatRoomChannel chatRoomChannel;
+
+
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Sending create-command for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomInfo(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return chatRoomChannel.getChatRoomInfo();
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomData(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return chatRoomChannel.getChatRoomData();
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@ExtendWith(SpringExtension.class)
+public abstract class ChatHomeTest
+{
+ @Autowired
+ ChatHome chatHome;
+
+
+ @Test
+ @DisplayName("Assert chatroom is delivered, if it exists")
+ void testGetExistingChatroom()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .log("testGetExistingChatroom")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException));
+
+ // Then
+ assertThat(mono).emitsCount(1);
+ }
+
+ @Test
+ @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+ void testGetNonExistentChatroom()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .log("testGetNonExistentChatroom")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException));
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(UnknownChatroomException.class);
+ UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+ assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+ });
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeWithShardsTest extends ChatHomeTest
+{
+ public static final int NUM_SHARDS = 10;
+ public static final int OWNED_SHARD = 2;
+ public static final int NOT_OWNED_SHARD = 0;
+
+
+ @Test
+ @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+ void testGetChatroomForNotOwnedShard()
+ {
+ // Given
+ UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+ // When
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
+ .log("testGetChatroomForNotOwnedShard")
+ .retryWhen(Retry
+ .backoff(5, Duration.ofSeconds(1))
+ .filter(throwable -> throwable instanceof LoadInProgressException));
+
+ // Then
+ assertThat(mono).sendsError(e ->
+ {
+ assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+ ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+ assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+ });
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-@ExtendWith(SpringExtension.class)
-public abstract class ChatHomeTest
-{
- @Autowired
- ChatHome chatHome;
-
-
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
- {
- // Given
- UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
- .log("testGetExistingChatroom")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
-
- // Then
- assertThat(mono).emitsCount(1);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
- .log("testGetNonExistentChatroom")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
-
- // Then
- assertThat(mono).sendsError(e ->
- {
- assertThat(e).isInstanceOf(UnknownChatroomException.class);
- UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
- assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
- });
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public abstract class ChatHomeWithShardsTest extends ChatHomeTest
-{
- public static final int NUM_SHARDS = 10;
- public static final int OWNED_SHARD = 2;
- public static final int NOT_OWNED_SHARD = 0;
-
-
- @Test
- @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
- void testGetChatroomForNotOwnedShard()
- {
- // Given
- UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
-
- // When
- Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoomData(chatRoomId))
- .log("testGetChatroomForNotOwnedShard")
- .retryWhen(Retry
- .backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
-
- // Then
- assertThat(mono).sendsError(e ->
- {
- assertThat(e).isInstanceOf(ShardNotOwnedException.class);
- ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
- assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
- });
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTest
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ ShardedChatHome chatHome(
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
+
+ IntStream
+ .of(ownedShards())
+ .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+ shard,
+ storageStrategy,
+ clock,
+ bufferSize()));
+
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+ return new ShardedChatHome(chatHomes, strategy);
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Clock clock)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ new KafkaLikeShardingStrategy(NUM_SHARDS),
+ new ObjectMapper());
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+ int[] ownedShards()
+ {
+ return new int[] { OWNED_SHARD };
+ }
+
+ int bufferSize()
+ {
+ return 8;
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-public class ShardedChatHomeTest extends ChatHomeWithShardsTest
-{
- @TestConfiguration
- static class Configuration
- {
- @Bean
- ShardedChatHome chatHome(
- StorageStrategy storageStrategy,
- Clock clock)
- {
- SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
-
- IntStream
- .of(ownedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
- shard,
- storageStrategy,
- clock,
- bufferSize()));
-
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
-
- return new ShardedChatHome(chatHomes, strategy);
- }
-
- @Bean
- public FilesStorageStrategy storageStrategy(Clock clock)
- {
- return new FilesStorageStrategy(
- Paths.get("target", "test-classes", "data", "files"),
- new KafkaLikeShardingStrategy(NUM_SHARDS),
- new ObjectMapper());
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
- int[] ownedShards()
- {
- return new int[] { OWNED_SHARD };
- }
-
- int bufferSize()
- {
- return 8;
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+public class SimpleChatHomeTest extends ChatHomeTest
+{
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ SimpleChatHome chatHome(
+ StorageStrategy storageStrategy,
+ Clock clock)
+ {
+ return new SimpleChatHome(
+ storageStrategy,
+ clock,
+ bufferSize());
+ }
+
+ @Bean
+ public FilesStorageStrategy storageStrategy(Clock clock)
+ {
+ return new FilesStorageStrategy(
+ Paths.get("target", "test-classes", "data", "files"),
+ chatRoomId -> 0,
+ new ObjectMapper());
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+ int bufferSize()
+ {
+ return 8;
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-public class SimpleChatHomeTest extends ChatHomeTest
-{
- @TestConfiguration
- static class Configuration
- {
- @Bean
- SimpleChatHome chatHome(
- StorageStrategy storageStrategy,
- Clock clock)
- {
- return new SimpleChatHome(
- storageStrategy,
- clock,
- bufferSize());
- }
-
- @Bean
- public FilesStorageStrategy storageStrategy(Clock clock)
- {
- return new FilesStorageStrategy(
- Paths.get("target", "test-classes", "data", "files"),
- chatRoomId -> 0,
- new ObjectMapper());
- }
-
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
-
- int bufferSize()
- {
- return 8;
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
+ KafkaServicesConfiguration.class,
+ KafkaAutoConfiguration.class,
+ TaskExecutionAutoConfiguration.class,
+ },
+ properties = {
+ "chat.backend.services=kafka",
+ "chat.backend.kafka.client-id-PREFIX=TEST",
+ "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+ "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+public class KafkaChatHomeTest extends ChatHomeWithShardsTest
+{
+ final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+ static CompletableFuture<Void> CONSUMER_JOB;
+
+
+ @TestConfiguration
+ @EnableConfigurationProperties(ChatBackendProperties.class)
+ static class KafkaChatHomeTestConfiguration
+ {
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+ }
+
+
+ @BeforeAll
+ public static void sendAndLoadStoredData(
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired Consumer chatRoomChannelConsumer,
+ @Autowired ThreadPoolTaskExecutor taskExecutor,
+ @Autowired ChatRoomChannel chatRoomChannel)
+ {
+ send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+ send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+ List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+ chatRoomChannelConsumer.assign(assignedPartitions);
+ chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ CONSUMER_JOB = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ }
+
+ static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+ {
+ ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+ record.headers().add("__TypeId__", typeId.getBytes());
+ SendResult<String, String> result = kafkaTemplate.send(record).join();
+ log.info(
+ "Sent {}={} to {}",
+ key,
+ value,
+ new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+ }
+
+ @AfterAll
+ static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+ {
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ CONSUMER_JOB.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.time.Clock;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
-
-
-@SpringBootTest(
- classes = {
- KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
- KafkaServicesConfiguration.class,
- KafkaAutoConfiguration.class,
- TaskExecutionAutoConfiguration.class,
- },
- properties = {
- "chat.backend.services=kafka",
- "chat.backend.kafka.client-id-PREFIX=TEST",
- "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
- "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
-@Slf4j
-public class KafkaChatHomeTest extends ChatHomeWithShardsTest
-{
- final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
-
- static CompletableFuture<Void> CONSUMER_JOB;
-
-
- @TestConfiguration
- @EnableConfigurationProperties(ChatBackendProperties.class)
- static class KafkaChatHomeTestConfiguration
- {
- @Bean
- Clock clock()
- {
- return Clock.systemDefaultZone();
- }
- }
-
-
- @BeforeAll
- public static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired Consumer chatRoomChannelConsumer,
- @Autowired ThreadPoolTaskExecutor taskExecutor,
- @Autowired ChatRoomChannel chatRoomChannel)
- {
- send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
- send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
- List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
- chatRoomChannelConsumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
- CONSUMER_JOB = taskExecutor
- .submitCompletable(chatRoomChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
- return null;
- });
- }
-
- static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
- {
- ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
- record.headers().add("__TypeId__", typeId.getBytes());
- SendResult<String, String> result = kafkaTemplate.send(record).join();
- log.info(
- "Sent {}={} to {}",
- key,
- value,
- new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
- }
-
- @AfterAll
- static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
- {
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- CONSUMER_JOB.join();
- log.info("Joined the consumer of the ChatRoomChannel");
- }
-}