X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaTestUtils.java;h=1da6158bfe9bdc72f456c39dd9513c39a1df0e2f;hb=64ede95835a496e84857c38213dbf8ea451878e0;hp=956d7cecf917034d9a0415af421cb207da244f1e;hpb=f604c5ad4ce13cc7ca90816a0ed58b4de4caeec6;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 956d7cec..1da6158b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -1,28 +1,29 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; +import reactor.core.publisher.Mono; -import java.time.Clock; import java.util.List; @Slf4j -public class KafkaTestUtils +public abstract class KafkaTestUtils { - @TestConfiguration - @EnableConfigurationProperties(ChatBackendProperties.class) - @Import(KafkaServicesConfiguration.class) public static class KafkaTestConfiguration { + @Bean + public ShardingPublisherStrategy shardingPublisherStrategy() + { + return shard -> Mono.just("MOCKED!"); + } + @Bean public WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties, @@ -36,12 +37,6 @@ public class KafkaTestUtils dataChannel.onPartitionsAssigned(assignedPartitions); }; } - - @Bean - public Clock clock() - { - return Clock.systemDefaultZone(); - } } @@ -49,7 +44,7 @@ public class KafkaTestUtils KafkaTemplate messageTemplate, String infoTopic, String dataTopic, - ConsumerTaskRunner consumerTaskRunner) + ChannelTaskRunner channelTaskRunner) { send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -57,7 +52,7 @@ public class KafkaTestUtils send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskRunner.executeConsumerTasks(); + channelTaskRunner.executeChannel(); } private static void send( @@ -76,9 +71,4 @@ public class KafkaTestUtils value, new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException - { - consumerTaskRunner.joinConsumerTasks(); - } }