import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import reactor.core.publisher.Mono;
-import java.time.Clock;
import java.util.List;
@Slf4j
-public class KafkaTestUtils
+public abstract class KafkaTestUtils
{
- @TestConfiguration
- @EnableConfigurationProperties(ChatBackendProperties.class)
- @Import(KafkaServicesConfiguration.class)
public static class KafkaTestConfiguration
{
@Bean
dataChannel.onPartitionsAssigned(assignedPartitions);
};
}
-
- @Bean
- public Clock clock()
- {
- return Clock.systemDefaultZone();
- }
}
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic,
- ConsumerTaskRunner consumerTaskRunner)
+ ChannelTaskRunner channelTaskRunner)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- consumerTaskRunner.executeConsumerTasks();
+ channelTaskRunner.executeChannel();
}
private static void send(
value,
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
-
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
- {
- consumerTaskRunner.joinConsumerTasks();
- }
}