package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
-import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
-import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
@BeforeAll
static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
@Autowired ChannelTaskExecutor dataChannelTaskExecutor)
{
- KafkaTestUtils.sendAndLoadStoredData(
- messageTemplate,
+ KafkaTestUtils.initKafkaSetup(
INFO_TOPIC,
- DATA_TOPIC);
-
- // The initialization of the data-channel must happen,
- // after the messages were sent into the topic of the
- // test-cluster.
- // Otherwise, the initial loading of the data might be
- // completed, before these messages arrive, so that
- // they are ignored and the state is never restored.
- dataChannelTaskExecutor.executeChannelTask();
+ DATA_TOPIC,
+ messageTemplate,
+ infoChannelTaskExecutor,
+ dataChannelTaskExecutor);
}
@Import(KafkaTestUtils.KafkaTestConfiguration.class)
static class KafkaConfigurationITConfiguration
{
- /**
- * The definition of this bean has to be overruled, so
- * that the configuration of the `initMethod`, which
- * has to be called explicitly, _after_ the messages
- * were sent to and received by the test-culster, can
- * be dropped.
- */
- @Bean(destroyMethod = "join")
- ChannelTaskExecutor dataChannelTaskExecutor(
- ThreadPoolTaskExecutor taskExecutor,
- DataChannel dataChannel,
- Consumer<String, AbstractMessageTo> dataChannelConsumer,
- WorkAssignor dataChannelWorkAssignor)
- {
- return new ChannelTaskExecutor(
- taskExecutor,
- dataChannel,
- dataChannelConsumer,
- dataChannelWorkAssignor);
- }
}
}
import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
+import java.util.List;
+
import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
@BeforeAll
static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate)
+ @Autowired KafkaTemplate<String, String> messageTemplate,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor,
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
{
- KafkaTestUtils.sendAndLoadStoredData(
- messageTemplate,
+ KafkaTestUtils.initKafkaSetup(
INFO_TOPIC,
- DATA_TOPIC);
+ DATA_TOPIC,
+ messageTemplate,
+ infoChannelTaskExecutor,
+ dataChannelTaskExecutor);
+ }
+
+
+ @TestConfiguration
+ static class KafkaChatHomeServiceTestConfiguration
+ {
+ @Bean
+ WorkAssignor infoChannelWorkAssignor()
+ {
+ return consumer ->
+ {
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(INFO_TOPIC)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(INFO_TOPIC, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
+ }
}
}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import reactor.core.publisher.Mono;
import java.util.List;
@Slf4j
public abstract class KafkaTestUtils
{
+ public static void initKafkaSetup(
+ String infoTopic,
+ String dataTopic,
+ KafkaTemplate<String, String> messageTemplate,
+ ChannelTaskExecutor infoChannelTaskExecutor,
+ ChannelTaskExecutor dataChannelTaskExecutor)
+ {
+ KafkaTestUtils.sendAndLoadStoredData(
+ messageTemplate,
+ infoTopic,
+ dataTopic);
+
+ // The initialization of the channels must happen,
+ // after the messages were sent into the topics of the
+ // test-cluster.
+ // Otherwise, the initial loading of the data might be
+ // completed, before these messages arrive, so that
+ // they are ignored and the state is never restored.
+ infoChannelTaskExecutor.executeChannelTask();
+ dataChannelTaskExecutor.executeChannelTask();
+ }
+
public static class KafkaTestConfiguration
{
@Bean
- public ShardingPublisherStrategy shardingPublisherStrategy()
+ ShardingPublisherStrategy shardingPublisherStrategy()
{
return shard -> Mono.just("MOCKED!");
}
@Bean
- public WorkAssignor dataChannelWorkAssignor(
+ WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,
DataChannel dataChannel)
{
dataChannel.onPartitionsAssigned(assignedPartitions);
};
}
+
+ /**
+ * The definition of this bean has to be overruled, so
+ * that the configuration of the `initMethod`, which
+ * has to be called explicitly, _after_ the messages
+ * were sent to and received by the test-culster, can
+ * be dropped.
+ */
+ @Bean(destroyMethod = "join")
+ ChannelTaskExecutor infoChannelTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ InfoChannel infoChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ WorkAssignor infoChannelWorkAssignor)
+ {
+ return new ChannelTaskExecutor(
+ taskExecutor,
+ infoChannel,
+ infoChannelConsumer,
+ infoChannelWorkAssignor);
+ }
+
+ /**
+ * The definition of this bean has to be overruled, so
+ * that the configuration of the `initMethod`, which
+ * has to be called explicitly, _after_ the messages
+ * were sent to and received by the test-culster, can
+ * be dropped.
+ */
+ @Bean(destroyMethod = "join")
+ ChannelTaskExecutor dataChannelTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
+ WorkAssignor dataChannelWorkAssignor)
+ {
+ return new ChannelTaskExecutor(
+ taskExecutor,
+ dataChannel,
+ dataChannelConsumer,
+ dataChannelWorkAssignor);
+ }
}
- public static void sendAndLoadStoredData(
+ private static void sendAndLoadStoredData(
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic)