import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
@Autowired
ConfigurableApplicationContext context;
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
+ @Autowired
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
@Autowired
ChatMessageChannel chatMessageChannel;
@Autowired
- ChatRoomChannel chatRoomChannel;
+ Consumer<String, MessageTo> chatMessageChannelConsumer;
CompletableFuture<Void> chatRoomChannelConsumerJob;
CompletableFuture<Void> chatMessageChannelConsumerJob;
@PreDestroy
public void joinChatRoomChannelConsumerJob()
{
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatRoomChannelConsumer.wakeup();
log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
chatRoomChannelConsumerJob.join();
log.info("Joined the consumer of the ChatRoomChannel");
@PreDestroy
public void joinChatMessageChannelConsumerJob()
{
+ log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+ chatMessageChannelConsumer.wakeup();
log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
chatMessageChannelConsumerJob.join();
log.info("Joined the consumer of the ChatMessageChannel");