]> juplo.de Git - demos/kafka/chat/commitdiff
test: refactor: Refined naming of `ChatHomeServiceIT` -- MOVE
authorKai Moritz <kai@juplo.de>
Mon, 25 Mar 2024 23:01:05 +0000 (00:01 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 25 Mar 2024 23:01:05 +0000 (00:01 +0100)
src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceWithShardsIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceWithShardsIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceWithShardsIT.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceIT.java b/src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceIT.java
new file mode 100644 (file)
index 0000000..3be9a35
--- /dev/null
@@ -0,0 +1,98 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
+import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@SpringJUnitConfig(classes = {
+    InMemoryServicesConfiguration.class,
+    FilesStorageConfiguration.class,
+    KafkaServicesConfiguration.class,
+    ChatHomeServiceTest.TestConfiguration.class })
+@EnableConfigurationProperties(ChatBackendProperties.class)
+public abstract class ChatHomeServiceTest
+{
+  @Autowired
+  ChatHomeService chatHomeService;
+
+
+  @Test
+  @DisplayName("Assert chatroom is delivered, if it exists")
+  void testGetExistingChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+        .log("testGetExistingChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+    // Then
+    assertThat(mono).emitsCount(1);
+  }
+
+  @Test
+  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+  void testGetNonExistentChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+        .log("testGetNonExistentChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(UnknownChatroomException.class);
+      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+    });
+  }
+
+  static class TestConfiguration
+  {
+    @Bean
+    ObjectMapper objectMapper()
+    {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.registerModule(new JavaTimeModule());
+      return objectMapper;
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceWithShardsIT.java b/src/test/java/de/juplo/kafka/chat/backend/domain/AbstractChatHomeServiceWithShardsIT.java
new file mode 100644 (file)
index 0000000..6d15675
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
+{
+  public static final int NUM_SHARDS = 10;
+  public static final int OWNED_SHARD = 2;
+  public static final int NOT_OWNED_SHARD = 0;
+
+
+  @Test
+  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+  void testGetChatroomForNotOwnedShard()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
+        .log("testGetChatroomForNotOwnedShard")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+      ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+      assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+    });
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java
deleted file mode 100644 (file)
index 3be9a35..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageConfiguration;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-@SpringJUnitConfig(classes = {
-    InMemoryServicesConfiguration.class,
-    FilesStorageConfiguration.class,
-    KafkaServicesConfiguration.class,
-    ChatHomeServiceTest.TestConfiguration.class })
-@EnableConfigurationProperties(ChatBackendProperties.class)
-public abstract class ChatHomeServiceTest
-{
-  @Autowired
-  ChatHomeService chatHomeService;
-
-
-  @Test
-  @DisplayName("Assert chatroom is delivered, if it exists")
-  void testGetExistingChatroom()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
-        .log("testGetExistingChatroom")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
-    // Then
-    assertThat(mono).emitsCount(1);
-  }
-
-  @Test
-  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
-  void testGetNonExistentChatroom()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
-        .log("testGetNonExistentChatroom")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
-    // Then
-    assertThat(mono).sendsError(e ->
-    {
-      assertThat(e).isInstanceOf(UnknownChatroomException.class);
-      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
-      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
-    });
-  }
-
-  static class TestConfiguration
-  {
-    @Bean
-    ObjectMapper objectMapper()
-    {
-      ObjectMapper objectMapper = new ObjectMapper();
-      objectMapper.registerModule(new JavaTimeModule());
-      return objectMapper;
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java
deleted file mode 100644 (file)
index 6d15675..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
-{
-  public static final int NUM_SHARDS = 10;
-  public static final int OWNED_SHARD = 2;
-  public static final int NOT_OWNED_SHARD = 0;
-
-
-  @Test
-  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
-  void testGetChatroomForNotOwnedShard()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHomeService.getChatRoomData(chatRoomId))
-        .log("testGetChatroomForNotOwnedShard")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof ChannelNotReadyException));
-
-    // Then
-    assertThat(mono).sendsError(e ->
-    {
-      assertThat(e).isInstanceOf(ShardNotOwnedException.class);
-      ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
-      assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
-    });
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java
deleted file mode 100644 (file)
index 95501ad..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import org.springframework.test.context.TestPropertySource;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.OWNED_SHARD;
-
-
-@TestPropertySource(properties = {
-    "chat.backend.inmemory.sharding-strategy=kafkalike",
-    "chat.backend.inmemory.num-shards=" + NUM_SHARDS,
-    "chat.backend.inmemory.owned-shards=" + OWNED_SHARD,
-    "chat.backend.inmemory.storage-strategy=files",
-    "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
-public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceWithShardsIT.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceWithShardsIT.java
new file mode 100644 (file)
index 0000000..95501ad
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.chat.backend.implementation.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import org.springframework.test.context.TestPropertySource;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.OWNED_SHARD;
+
+
+@TestPropertySource(properties = {
+    "chat.backend.inmemory.sharding-strategy=kafkalike",
+    "chat.backend.inmemory.num-shards=" + NUM_SHARDS,
+    "chat.backend.inmemory.owned-shards=" + OWNED_SHARD,
+    "chat.backend.inmemory.storage-strategy=files",
+    "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
+public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceIT.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceIT.java
new file mode 100644 (file)
index 0000000..09369cb
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.chat.backend.implementation.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
+import org.springframework.test.context.TestPropertySource;
+
+
+@TestPropertySource(properties = {
+    "chat.backend.inmemory.sharding-strategy=none",
+    "chat.backend.inmemory.storage-strategy=files",
+    "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
+public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
+{
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java
deleted file mode 100644 (file)
index 09369cb..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
-import org.springframework.test.context.TestPropertySource;
-
-
-@TestPropertySource(properties = {
-    "chat.backend.inmemory.sharding-strategy=none",
-    "chat.backend.inmemory.storage-strategy=files",
-    "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
-public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
-{
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
deleted file mode 100644 (file)
index 1fec526..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.TestPropertySource;
-
-import java.util.List;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
-
-
-@ContextConfiguration(classes = {
-        KafkaTestUtils.KafkaTestConfiguration.class,
-        KafkaAutoConfiguration.class,
-        TaskExecutionAutoConfiguration.class,
-    })
-@TestPropertySource(properties = {
-        "chat.backend.services=kafka",
-        "chat.backend.kafka.client-id-PREFIX=TEST",
-        "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
-        "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
-        "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(
-    topics = { INFO_TOPIC, DATA_TOPIC },
-    partitions = NUM_SHARDS)
-@Slf4j
-public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
-  final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
-  final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
-
-
-  @BeforeAll
-  static void sendAndLoadStoredData(
-      @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
-      @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
-  {
-    KafkaTestUtils.initKafkaSetup(
-        INFO_TOPIC,
-        DATA_TOPIC,
-        messageTemplate,
-        infoChannelTaskExecutor,
-        dataChannelTaskExecutor);
-  }
-
-
-  @TestConfiguration
-  static class KafkaChatHomeServiceTestConfiguration
-  {
-    @Bean
-    WorkAssignor infoChannelWorkAssignor()
-    {
-      return consumer ->
-      {
-        List<TopicPartition> partitions = consumer
-            .partitionsFor(INFO_TOPIC)
-            .stream()
-            .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
-            .toList();
-        consumer.assign(partitions);
-      };
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceWithShardsIT.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceWithShardsIT.java
new file mode 100644 (file)
index 0000000..1fec526
--- /dev/null
@@ -0,0 +1,80 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+
+import java.util.List;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
+
+
+@ContextConfiguration(classes = {
+        KafkaTestUtils.KafkaTestConfiguration.class,
+        KafkaAutoConfiguration.class,
+        TaskExecutionAutoConfiguration.class,
+    })
+@TestPropertySource(properties = {
+        "chat.backend.services=kafka",
+        "chat.backend.kafka.client-id-PREFIX=TEST",
+        "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+        "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+        "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
+        "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = NUM_SHARDS)
+@Slf4j
+public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+  final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
+  final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
+
+
+  @BeforeAll
+  static void sendAndLoadStoredData(
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
+      @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
+  {
+    KafkaTestUtils.initKafkaSetup(
+        INFO_TOPIC,
+        DATA_TOPIC,
+        messageTemplate,
+        infoChannelTaskExecutor,
+        dataChannelTaskExecutor);
+  }
+
+
+  @TestConfiguration
+  static class KafkaChatHomeServiceTestConfiguration
+  {
+    @Bean
+    WorkAssignor infoChannelWorkAssignor()
+    {
+      return consumer ->
+      {
+        List<TopicPartition> partitions = consumer
+            .partitionsFor(INFO_TOPIC)
+            .stream()
+            .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
+            .toList();
+        consumer.assign(partitions);
+      };
+    }
+  }
+}