* `KafkaServicesApplicationRunner` does not have to join the channel-tasks.
* The channel-tasks are already joined by `ChannelTaskExecutor.join()`
automatically, because the method is annotated with `@PreDestroy`.
* Simplified the test-configuration accordingly.
infoChannelTaskExecutor.executeChannelTask();
dataChannelTaskExecutor.executeChannelTask();
}
-
- public void joinChannels() throws InterruptedException
- {
- joinChannel(dataChannelTaskExecutor);
- joinChannel(infoChannelTaskExecutor);
- }
-
- private void joinChannel(
- ChannelTaskExecutor channelTaskExecutor)
- throws InterruptedException
- {
- Channel channel = channelTaskExecutor.getChannel();
- while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
- {
- log.info("Waiting for {} to shut down...", channel);
- Thread.sleep(1000);
- }
- channelTaskExecutor.join();
- }
}
package de.juplo.kafka.chat.backend.implementation.kafka;
-import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
private final ChannelTaskRunner channelTaskRunner;
- private final Consumer dataChannelConsumer;
- private final Consumer infoChannelConsumer;
@Override
log.info("Executing channel-tasks");
channelTaskRunner.executeChannels();
}
-
- @PreDestroy
- public void joinChannels() throws InterruptedException
- {
- log.info("Closing consumers");
- dataChannelConsumer.close();
- infoChannelConsumer.close();
- log.info("Joining channel-tasks");
- channelTaskRunner.joinChannels();
- }
}
import de.juplo.kafka.chat.backend.implementation.kafka.*;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
@AfterAll
static void joinChannels(
- @Autowired Consumer dataChannelConsumer,
- @Autowired Consumer infoChannelConsumer,
- @Autowired ChannelTaskRunner channelTaskRunner)
- throws InterruptedException
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
{
- dataChannelConsumer.wakeup();
- infoChannelConsumer.wakeup();
- channelTaskRunner.joinChannels();
+ dataChannelTaskExecutor.join();
+ infoChannelTaskExecutor.join();
}
import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
@AfterAll
static void joinChannels(
- @Autowired Consumer dataChannelConsumer,
- @Autowired Consumer infoChannelConsumer,
- @Autowired ChannelTaskRunner channelTaskRunner)
- throws InterruptedException
+ @Autowired ChannelTaskExecutor dataChannelTaskExecutor,
+ @Autowired ChannelTaskExecutor infoChannelTaskExecutor)
{
- dataChannelConsumer.wakeup();
- infoChannelConsumer.wakeup();
- channelTaskRunner.joinChannels();
+ dataChannelTaskExecutor.join();
+ infoChannelTaskExecutor.join();
}
}