* Removed class `ChannelTaskRunner` and `KafkaServicesApplicationRunner`.
* Instead, the method `ChannelTaskExecutor.excuteChannelTask()` is executed
as `@Bean.initMethod` by Spring.
* Adapted the test-cases accordingly:
** Joinig the channel-tasks is not necessary any more, because that is
done by the imported production-config
** `KafkaConfigurationIT` has to call `executeChannelTasks()`
explicitly
** Therefore, it has to overrule the default-config for the bean
`dataChannelTaskExecutor` in order to drop the configuration of the
`initMethod`.
** Otherwise, the test would (might) not restore the data from the topic,
because the messages, that are send into the test-cluster, might arrive
only after the initial loading of the data is done.
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChannelTaskRunner
-{
- private final ChannelTaskExecutor infoChannelTaskExecutor;
- private final ChannelTaskExecutor dataChannelTaskExecutor;
-
- public void executeChannels()
- {
- infoChannelTaskExecutor.executeChannelTask();
- dataChannelTaskExecutor.executeChannelTask();
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.stereotype.Component;
-
-
-@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "services",
- havingValue = "kafka")
-@Component
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaServicesApplicationRunner implements ApplicationRunner
-{
- private final ChannelTaskRunner channelTaskRunner;
-
-
- @Override
- public void run(ApplicationArguments args)
- {
- log.info("Executing channel-tasks");
- channelTaskRunner.executeChannels();
- }
-}
return new KafkaServicesThreadPoolTaskExecutorCustomizer();
}
- @Bean
- ChannelTaskRunner channelTaskRunner(
- ChannelTaskExecutor infoChannelTaskExecutor,
- ChannelTaskExecutor dataChannelTaskExecutor)
- {
- return new ChannelTaskRunner(
- infoChannelTaskExecutor,
- dataChannelTaskExecutor);
- }
-
- @Bean(destroyMethod = "join")
+ @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
ChannelTaskExecutor infoChannelTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
};
}
- @Bean(destroyMethod = "join")
+ @Bean(initMethod = "executeChannelTask", destroyMethod = "join")
ChannelTaskExecutor dataChannelTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
package de.juplo.kafka.chat.backend;
-import de.juplo.kafka.chat.backend.implementation.kafka.*;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelTaskExecutor;
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
+import de.juplo.kafka.chat.backend.implementation.kafka.KafkaTestUtils;
+import de.juplo.kafka.chat.backend.implementation.kafka.WorkAssignor;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterAll;
+import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
- @MockBean
- KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
-
@BeforeAll
- public static void sendAndLoadStoredData(
+ static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelTaskRunner channelTaskRunner)
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
- DATA_TOPIC,
- channelTaskRunner);
- }
+ DATA_TOPIC);
- @AfterAll
- static void joinChannels(
- @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
- @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
- {
- dataChannelTaskExecutor.join();
- infoChannelTaskExecutor.join();
+ // The initialization of the data-channel must happen,
+ // after the messages were sent into the topic of the
+ // test-cluster.
+ // Otherwise, the initial loading of the data might be
+ // completed, before these messages arrive, so that
+ // they are ignored and the state is never restored.
+ dataChannelTaskExecutor.executeChannelTask();
}
@Import(KafkaTestUtils.KafkaTestConfiguration.class)
static class KafkaConfigurationITConfiguration
{
+ /**
+ * The definition of this bean has to be overruled, so
+ * that the configuration of the `initMethod`, which
+ * has to be called explicitly, _after_ the messages
+ * were sent to and received by the test-culster, can
+ * be dropped.
+ */
+ @Bean(destroyMethod = "join")
+ ChannelTaskExecutor dataChannelTaskExecutor(
+ ThreadPoolTaskExecutor taskExecutor,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
+ WorkAssignor dataChannelWorkAssignor)
+ {
+ return new ChannelTaskExecutor(
+ taskExecutor,
+ dataChannel,
+ dataChannelConsumer,
+ dataChannelWorkAssignor);
+ }
}
}
import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@BeforeAll
static void sendAndLoadStoredData(
- @Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelTaskRunner channelTaskRunner)
+ @Autowired KafkaTemplate<String, String> messageTemplate)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
- DATA_TOPIC,
- channelTaskRunner);
- }
-
- @AfterAll
- static void joinChannels(
- @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
- @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
- {
- dataChannelTaskExecutor.join();
- infoChannelTaskExecutor.join();
+ DATA_TOPIC);
}
}
public static void sendAndLoadStoredData(
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
- String dataTopic,
- ChannelTaskRunner channelTaskRunner)
+ String dataTopic)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
- channelTaskRunner.executeChannels();
}
private static void send(