From: Kai Moritz Date: Mon, 4 Mar 2024 13:34:34 +0000 (+0100) Subject: refactor: Refined channel-states, introduced `ChannelState` -- ALIGN X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0d14c8efe97dd418aa2556916b8a7c47ea8b08b7;p=demos%2Fkafka%2Fchat refactor: Refined channel-states, introduced `ChannelState` -- ALIGN * Renamed attributes and method-names according to the class-renames. * Introduced interface `Channel` and `enum ChannelState`. * `Data` - and `InfoChannel` maintain a `ChannelState`, instead just a plain boolean, that only reflects the loading-state. * The `ChannelTaskRunner` waits, until both channels entered the State `ChannelState.SHUTTING_DOWN`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java new file mode 100644 index 00000000..6ba42cc9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java @@ -0,0 +1,6 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +public interface Channel extends Runnable +{ + ChannelState getChannelState(); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java index 8a0a81f9..0746748a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java @@ -1,10 +1,13 @@ -package de.juplo.kafka.chat.backend.domain.exceptions; +package de.juplo.kafka.chat.backend.implementation.kafka; -public class LoadInProgressException extends IllegalStateException +public class ChannelNotReadyException extends IllegalStateException { - public LoadInProgressException() + public final ChannelState state; + + public ChannelNotReadyException(ChannelState state) { - super("Load in progress..."); + super("Not ready! Current state: " + state); + this.state = state; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java new file mode 100644 index 00000000..554b4d66 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.implementation.kafka; + +public enum ChannelState +{ + STARTING, + LOAD_IN_PROGRESS, + READY, + SHUTTING_DOWN +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java index 9425bdf4..636c03bc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -12,36 +13,37 @@ import java.util.concurrent.CompletableFuture; @RequiredArgsConstructor @Slf4j -public class ConsumerTaskExecutor +public class ChannelTaskExecutor { private final ThreadPoolTaskExecutor taskExecutor; - private final Runnable consumerTask; + @Getter + private final Channel channel; private final Consumer consumer; private final WorkAssignor workAssignor; - CompletableFuture consumerTaskJob; + CompletableFuture channelTaskJob; - public void executeConsumerTask() + public void executeChannelTask() { workAssignor.assignWork(consumer); - log.info("Starting the consumer-task for {}", consumerTask); - consumerTaskJob = taskExecutor - .submitCompletable(consumerTask) + log.info("Starting the consumer-task for {}", channel); + channelTaskJob = taskExecutor + .submitCompletable(channel) .exceptionally(e -> { - log.error("The consumer-task for {} exited abnormally!", consumerTask, e); + log.error("The consumer-task for {} exited abnormally!", channel, e); return null; }); } @PreDestroy - public void joinConsumerTaskJob() + public void join() { - log.info("Signaling the consumer-task for {} to quit its work", consumerTask); + log.info("Signaling the consumer-task for {} to quit its work", channel); consumer.wakeup(); - log.info("Waiting for the consumer of {} to finish its work", consumerTask); - consumerTaskJob.join(); - log.info("Joined the consumer-task for {}", consumerTask); + log.info("Waiting for the consumer of {} to finish its work", channel); + channelTaskJob.join(); + log.info("Joined the consumer-task for {}", channel); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java index c2c28014..5e565281 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java @@ -6,26 +6,33 @@ import lombok.extern.slf4j.Slf4j; @RequiredArgsConstructor @Slf4j -public class ConsumerTaskRunner +public class ChannelTaskRunner { - private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; - private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; - private final InfoChannel infoChannel; + private final ChannelTaskExecutor infoChannelTaskExecutor; + private final ChannelTaskExecutor dataChannelTaskExecutor; - public void executeConsumerTasks() + public void executeChannels() { - infoChannelConsumerTaskExecutor.executeConsumerTask(); - dataChannelConsumerTaskExecutor.executeConsumerTask(); + infoChannelTaskExecutor.executeChannelTask(); + dataChannelTaskExecutor.executeChannelTask(); } - public void joinConsumerTasks() throws InterruptedException + public void joinChannels() throws InterruptedException { - dataChannelConsumerTaskExecutor.joinConsumerTaskJob(); - while (infoChannel.isLoadInProgress()) + joinChannel(dataChannelTaskExecutor); + joinChannel(infoChannelTaskExecutor); + } + + private void joinChannel( + ChannelTaskExecutor channelTaskExecutor) + throws InterruptedException + { + Channel channel = channelTaskExecutor.getChannel(); + while (channel.getChannelState() != ChannelState.SHUTTING_DOWN) { - log.info("Waiting for {} to finish loading...", infoChannel); + log.info("Waiting for {} to shut down...", channel); Thread.sleep(1000); } - infoChannelConsumerTaskExecutor.joinConsumerTaskJob(); + channelTaskExecutor.join(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index e1754a15..2468af5a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -1,11 +1,14 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; @@ -15,12 +18,16 @@ import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceId" }) @Slf4j -public class DataChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Channel, ConsumerRebalanceListener { private final String instanceId; private final String topic; @@ -40,7 +47,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener private boolean running; @Getter - private volatile boolean loadInProgress; + private volatile ChannelState channelState = ChannelState.STARTING; public DataChannel( @@ -129,7 +136,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { @@ -196,29 +203,34 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); - if (loadInProgress) + switch (channelState) { - loadChatRoomData(records); - - if (isLoadingCompleted()) + case LOAD_IN_PROGRESS -> { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; + loadChatRoomData(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + channelState = ChannelState.READY; + } } - } - else - { - if (!records.isEmpty()) + case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count()); + default -> { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState); + } } } } catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -317,9 +329,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Mono getChatRoomData(int shard, UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } if (!isShardOwned[shard]) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index f3150ce2..7665faea 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,33 +1,33 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.URI; -import java.time.*; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceUri" }) @Slf4j -public class InfoChannel implements Runnable +public class InfoChannel implements Channel { private final String topic; private final Producer producer; @@ -43,7 +43,7 @@ public class InfoChannel implements Runnable private boolean running; @Getter - private volatile boolean loadInProgress = true; + private volatile ChannelState channelState = ChannelState.STARTING; public InfoChannel( @@ -193,7 +193,7 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; while (running) { @@ -210,6 +210,7 @@ public class InfoChannel implements Runnable catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -220,10 +221,15 @@ public class InfoChannel implements Runnable private void updateNextOffset(int partition, long nextOffset) { this.nextOffset[partition] = nextOffset; - if (loadInProgress) { - loadInProgress = IntStream + if (channelState == ChannelState.LOAD_IN_PROGRESS) + { + boolean loadInProgress = IntStream .range(0, numShards) .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]); + if (!loadInProgress) + { + channelState = ChannelState.READY; + } } } @@ -297,9 +303,10 @@ public class InfoChannel implements Runnable Mono getChatRoomInfo(UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } return Mono.fromSupplier(() -> chatRoomInfo.get(id)); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 69e94c9e..badaeedc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -16,18 +16,18 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class KafkaServicesApplicationRunner implements ApplicationRunner { - private final ConsumerTaskRunner consumerTaskRunner; + private final ChannelTaskRunner channelTaskRunner; @Override public void run(ApplicationArguments args) { - consumerTaskRunner.executeConsumerTasks(); + channelTaskRunner.executeChannels(); } @PreDestroy - public void joinConsumerTasks() throws InterruptedException + public void joinChannels() throws InterruptedException { - consumerTaskRunner.joinConsumerTasks(); + channelTaskRunner.joinChannels(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 33371279..c7cf113a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; @@ -40,25 +39,23 @@ import java.util.Properties; public class KafkaServicesConfiguration { @Bean - ConsumerTaskRunner consumerTaskRunner( - ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor, - InfoChannel infoChannel) + ChannelTaskRunner channelTaskRunner( + ChannelTaskExecutor infoChannelTaskExecutor, + ChannelTaskExecutor dataChannelTaskExecutor) { - return new ConsumerTaskRunner( - infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor, - infoChannel); + return new ChannelTaskRunner( + infoChannelTaskExecutor, + dataChannelTaskExecutor); } @Bean - ConsumerTaskExecutor infoChannelConsumerTaskExecutor( + ChannelTaskExecutor infoChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, WorkAssignor infoChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelTaskExecutor( taskExecutor, infoChannel, infoChannelConsumer, @@ -82,13 +79,13 @@ public class KafkaServicesConfiguration } @Bean - ConsumerTaskExecutor dataChannelConsumerTaskExecutor( + ChannelTaskExecutor dataChannelTaskExecutor( ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, WorkAssignor dataChannelWorkAssignor) { - return new ConsumerTaskExecutor( + return new ChannelTaskExecutor( taskExecutor, dataChannel, dataChannelConsumer, diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index e01e012d..0dfba9da 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.implementation.kafka.*; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; @@ -43,21 +44,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT @BeforeAll public static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelTaskRunner channelTaskRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - consumerTaskRunner); + channelTaskRunner); } @AfterAll - static void joinConsumerTasks( - @Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinChannels( + @Autowired Consumer dataChannelConsumer, + @Autowired Consumer infoChannelConsumer, + @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); + dataChannelConsumer.wakeup(); + infoChannelConsumer.wakeup(); + channelTaskRunner.joinChannels(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java index d859b141..3be9a35a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.domain; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration; import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration; @@ -49,7 +49,7 @@ public abstract class ChatHomeServiceTest .log("testGetExistingChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).emitsCount(1); @@ -68,7 +68,7 @@ public abstract class ChatHomeServiceTest .log("testGetNonExistentChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).sendsError(e -> diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java index 5eeac471..6d156750 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java @@ -1,6 +1,6 @@ package de.juplo.kafka.chat.backend.domain; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -33,7 +33,7 @@ public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest .log("testGetChatroomForNotOwnedShard") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) - .filter(throwable -> throwable instanceof LoadInProgressException)); + .filter(throwable -> throwable instanceof ChannelNotReadyException)); // Then assertThat(mono).sendsError(e -> diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java index 180ff152..a017cf35 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; @@ -44,20 +45,24 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest @BeforeAll static void sendAndLoadStoredData( @Autowired KafkaTemplate messageTemplate, - @Autowired ConsumerTaskRunner consumerTaskRunner) + @Autowired ChannelTaskRunner channelTaskRunner) { KafkaTestUtils.sendAndLoadStoredData( messageTemplate, INFO_TOPIC, DATA_TOPIC, - consumerTaskRunner); + channelTaskRunner); } @AfterAll - static void joinConsumerTasks( - @Autowired ConsumerTaskRunner consumerTaskRunner) + static void joinChannels( + @Autowired Consumer dataChannelConsumer, + @Autowired Consumer infoChannelConsumer, + @Autowired ChannelTaskRunner channelTaskRunner) throws InterruptedException { - KafkaTestUtils.joinConsumerTasks(consumerTaskRunner); + dataChannelConsumer.wakeup(); + infoChannelConsumer.wakeup(); + channelTaskRunner.joinChannels(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java index 2ede2029..6ea4772e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java @@ -44,7 +44,7 @@ public abstract class KafkaTestUtils KafkaTemplate messageTemplate, String infoTopic, String dataTopic, - ConsumerTaskRunner consumerTaskRunner) + ChannelTaskRunner channelTaskRunner) { send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received"); @@ -52,7 +52,7 @@ public abstract class KafkaTestUtils send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received"); send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received"); - consumerTaskRunner.executeConsumerTasks(); + channelTaskRunner.executeChannels(); } private static void send( @@ -71,9 +71,4 @@ public abstract class KafkaTestUtils value, new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition())); } - - public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException - { - consumerTaskRunner.joinConsumerTasks(); - } }