refactor: Refined channel-states, introduced `ChannelState` -- ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 4 Mar 2024 13:34:34 +0000 (14:34 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
* Renamed attributes and method-names according to the class-renames.
* Introduced interface `Channel` and `enum ChannelState`.
* `Data` - and `InfoChannel` maintain a `ChannelState`, instead just a
  plain boolean, that only reflects the loading-state.
* The `ChannelTaskRunner` waits, until both channels entered the State
  `ChannelState.SHUTTING_DOWN`.

14 files changed:
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelNotReadyException.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskExecutor.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/Channel.java
new file mode 100644 (file)
index 0000000..6ba42cc
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+public interface Channel extends Runnable
+{
+  ChannelState getChannelState();
+}
index 8a0a81f..0746748 100644 (file)
@@ -1,10 +1,13 @@
-package de.juplo.kafka.chat.backend.domain.exceptions;
+package de.juplo.kafka.chat.backend.implementation.kafka;
 
 
-public class LoadInProgressException extends IllegalStateException
+public class ChannelNotReadyException extends IllegalStateException
 {
-  public LoadInProgressException()
+  public final ChannelState state;
+
+  public ChannelNotReadyException(ChannelState state)
   {
-    super("Load in progress...");
+    super("Not ready! Current state: " + state);
+    this.state = state;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelState.java
new file mode 100644 (file)
index 0000000..554b4d6
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+public enum ChannelState
+{
+  STARTING,
+  LOAD_IN_PROGRESS,
+  READY,
+  SHUTTING_DOWN
+}
index 9425bdf..636c03b 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
+import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -12,36 +13,37 @@ import java.util.concurrent.CompletableFuture;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ConsumerTaskExecutor
+public class ChannelTaskExecutor
 {
   private final ThreadPoolTaskExecutor taskExecutor;
-  private final Runnable consumerTask;
+  @Getter
+  private final Channel channel;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final WorkAssignor workAssignor;
 
-  CompletableFuture<Void> consumerTaskJob;
+  CompletableFuture<Void> channelTaskJob;
 
 
-  public void executeConsumerTask()
+  public void executeChannelTask()
   {
     workAssignor.assignWork(consumer);
-    log.info("Starting the consumer-task for {}", consumerTask);
-    consumerTaskJob = taskExecutor
-        .submitCompletable(consumerTask)
+    log.info("Starting the consumer-task for {}", channel);
+    channelTaskJob = taskExecutor
+        .submitCompletable(channel)
         .exceptionally(e ->
         {
-          log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+          log.error("The consumer-task for {} exited abnormally!", channel, e);
           return null;
         });
   }
 
   @PreDestroy
-  public void joinConsumerTaskJob()
+  public void join()
   {
-    log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+    log.info("Signaling the consumer-task for {} to quit its work", channel);
     consumer.wakeup();
-    log.info("Waiting for the consumer of {} to finish its work", consumerTask);
-    consumerTaskJob.join();
-    log.info("Joined the consumer-task for {}", consumerTask);
+    log.info("Waiting for the consumer of {} to finish its work", channel);
+    channelTaskJob.join();
+    log.info("Joined the consumer-task for {}", channel);
   }
 }
index c2c2801..5e56528 100644 (file)
@@ -6,26 +6,33 @@ import lombok.extern.slf4j.Slf4j;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ConsumerTaskRunner
+public class ChannelTaskRunner
 {
-  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
-  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
-  private final InfoChannel infoChannel;
+  private final ChannelTaskExecutor infoChannelTaskExecutor;
+  private final ChannelTaskExecutor dataChannelTaskExecutor;
 
-  public void executeConsumerTasks()
+  public void executeChannels()
   {
-    infoChannelConsumerTaskExecutor.executeConsumerTask();
-    dataChannelConsumerTaskExecutor.executeConsumerTask();
+    infoChannelTaskExecutor.executeChannelTask();
+    dataChannelTaskExecutor.executeChannelTask();
   }
 
-  public void joinConsumerTasks() throws InterruptedException
+  public void joinChannels() throws InterruptedException
   {
-    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
-    while (infoChannel.isLoadInProgress())
+    joinChannel(dataChannelTaskExecutor);
+    joinChannel(infoChannelTaskExecutor);
+  }
+
+  private void joinChannel(
+      ChannelTaskExecutor channelTaskExecutor)
+      throws InterruptedException
+  {
+    Channel channel = channelTaskExecutor.getChannel();
+    while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
     {
-      log.info("Waiting for {} to finish loading...", infoChannel);
+      log.info("Waiting for {} to shut down...", channel);
       Thread.sleep(1000);
     }
-    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    channelTaskExecutor.join();
   }
 }
index e1754a1..2468af5 100644 (file)
@@ -1,11 +1,14 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.Producer;
@@ -15,12 +18,16 @@ import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceId" })
 @Slf4j
-public class DataChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Channel, ConsumerRebalanceListener
 {
   private final String instanceId;
   private final String topic;
@@ -40,7 +47,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public DataChannel(
@@ -129,7 +136,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
     log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
     {
@@ -196,29 +203,34 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
         log.info("Fetched {} messages", records.count());
 
-        if (loadInProgress)
+        switch (channelState)
         {
-          loadChatRoomData(records);
-
-          if (isLoadingCompleted())
+          case LOAD_IN_PROGRESS ->
           {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
+            loadChatRoomData(records);
+
+            if (isLoadingCompleted())
+            {
+              log.info("Loading of messages completed! Pausing all owned partitions...");
+              pauseAllOwnedPartions();
+              log.info("Resuming normal operations...");
+              channelState = ChannelState.READY;
+            }
           }
-        }
-        else
-        {
-          if (!records.isEmpty())
+          case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
+          default ->
           {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+            if (!records.isEmpty())
+            {
+              throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState);
+            }
           }
         }
       }
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
@@ -317,9 +329,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
   {
-    if (loadInProgress)
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
     {
-      return Mono.error(new LoadInProgressException());
+      return Mono.error(new ChannelNotReadyException(capturedState));
     }
 
     if (!isShardOwned[shard])
index f3150ce..7665fae 100644 (file)
@@ -1,33 +1,33 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.net.URI;
-import java.time.*;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
 
+@ToString(of = { "topic", "instanceUri" })
 @Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -43,7 +43,7 @@ public class InfoChannel implements Runnable
 
   private boolean running;
   @Getter
-  private volatile boolean loadInProgress = true;
+  private volatile ChannelState channelState = ChannelState.STARTING;
 
 
   public InfoChannel(
@@ -193,7 +193,7 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = 0l);
-    loadInProgress = true;
+    channelState = ChannelState.LOAD_IN_PROGRESS;
 
     while (running)
     {
@@ -210,6 +210,7 @@ public class InfoChannel implements Runnable
       catch (WakeupException e)
       {
         log.info("Received WakeupException, exiting!");
+        channelState = ChannelState.SHUTTING_DOWN;
         running = false;
       }
     }
@@ -220,10 +221,15 @@ public class InfoChannel implements Runnable
   private void updateNextOffset(int partition, long nextOffset)
   {
     this.nextOffset[partition] = nextOffset;
-    if (loadInProgress) {
-      loadInProgress = IntStream
+    if (channelState == ChannelState.LOAD_IN_PROGRESS)
+    {
+      boolean loadInProgress = IntStream
           .range(0, numShards)
           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+      if (!loadInProgress)
+      {
+        channelState = ChannelState.READY;
+      }
     }
   }
 
@@ -297,9 +303,10 @@ public class InfoChannel implements Runnable
 
   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
+    ChannelState capturedState = channelState;
+    if (capturedState != ChannelState.READY)
     {
-      return Mono.error(new LoadInProgressException());
+      return Mono.error(new ChannelNotReadyException(capturedState));
     }
 
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
index 69e94c9..badaeed 100644 (file)
@@ -16,18 +16,18 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  private final ConsumerTaskRunner consumerTaskRunner;
+  private final ChannelTaskRunner channelTaskRunner;
 
 
   @Override
   public void run(ApplicationArguments args)
   {
-    consumerTaskRunner.executeConsumerTasks();
+    channelTaskRunner.executeChannels();
   }
 
   @PreDestroy
-  public void joinConsumerTasks() throws InterruptedException
+  public void joinChannels() throws InterruptedException
   {
-    consumerTaskRunner.joinConsumerTasks();
+    channelTaskRunner.joinChannels();
   }
 }
index 3337127..c7cf113 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
@@ -40,25 +39,23 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ConsumerTaskRunner consumerTaskRunner(
-      ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
-      InfoChannel infoChannel)
+  ChannelTaskRunner channelTaskRunner(
+      ChannelTaskExecutor infoChannelTaskExecutor,
+      ChannelTaskExecutor dataChannelTaskExecutor)
   {
-    return new ConsumerTaskRunner(
-        infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor,
-        infoChannel);
+    return new ChannelTaskRunner(
+        infoChannelTaskExecutor,
+        dataChannelTaskExecutor);
   }
 
   @Bean
-  ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+  ChannelTaskExecutor infoChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
       Consumer<String, AbstractMessageTo> infoChannelConsumer,
       WorkAssignor infoChannelWorkAssignor)
   {
-    return new ConsumerTaskExecutor(
+    return new ChannelTaskExecutor(
         taskExecutor,
         infoChannel,
         infoChannelConsumer,
@@ -82,13 +79,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+  ChannelTaskExecutor dataChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       WorkAssignor dataChannelWorkAssignor)
   {
-    return new ConsumerTaskExecutor(
+    return new ChannelTaskExecutor(
         taskExecutor,
         dataChannel,
         dataChannelConsumer,
index e01e012..0dfba9d 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.implementation.kafka.*;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,21 +44,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @BeforeAll
   public static void sendAndLoadStoredData(
       @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      @Autowired ChannelTaskRunner channelTaskRunner)
   {
     KafkaTestUtils.sendAndLoadStoredData(
         messageTemplate,
         INFO_TOPIC,
         DATA_TOPIC,
-        consumerTaskRunner);
+        channelTaskRunner);
   }
 
   @AfterAll
-  static void joinConsumerTasks(
-      @Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinChannels(
+      @Autowired Consumer dataChannelConsumer,
+      @Autowired Consumer infoChannelConsumer,
+      @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+    dataChannelConsumer.wakeup();
+    infoChannelConsumer.wakeup();
+    channelTaskRunner.joinChannels();
   }
 
 
index d859b14..3be9a35 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.domain;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
 import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
 import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
@@ -49,7 +49,7 @@ public abstract class ChatHomeServiceTest
         .log("testGetExistingChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).emitsCount(1);
@@ -68,7 +68,7 @@ public abstract class ChatHomeServiceTest
         .log("testGetNonExistentChatroom")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).sendsError(e ->
index 5eeac47..6d15675 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -33,7 +33,7 @@ public abstract class ChatHomeServiceWithShardsTest extends ChatHomeServiceTest
         .log("testGetChatroomForNotOwnedShard")
         .retryWhen(Retry
             .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
+            .filter(throwable -> throwable instanceof ChannelNotReadyException));
 
     // Then
     assertThat(mono).sendsError(e ->
index 180ff15..a017cf3 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,20 +45,24 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   @BeforeAll
   static void sendAndLoadStoredData(
       @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired ConsumerTaskRunner consumerTaskRunner)
+      @Autowired ChannelTaskRunner channelTaskRunner)
   {
     KafkaTestUtils.sendAndLoadStoredData(
         messageTemplate,
         INFO_TOPIC,
         DATA_TOPIC,
-        consumerTaskRunner);
+        channelTaskRunner);
   }
 
   @AfterAll
-  static void joinConsumerTasks(
-      @Autowired ConsumerTaskRunner consumerTaskRunner)
+  static void joinChannels(
+      @Autowired Consumer dataChannelConsumer,
+      @Autowired Consumer infoChannelConsumer,
+      @Autowired ChannelTaskRunner channelTaskRunner)
       throws InterruptedException
   {
-    KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+    dataChannelConsumer.wakeup();
+    infoChannelConsumer.wakeup();
+    channelTaskRunner.joinChannels();
   }
 }
index 2ede202..6ea4772 100644 (file)
@@ -44,7 +44,7 @@ public abstract class KafkaTestUtils
       KafkaTemplate<String, String> messageTemplate,
       String infoTopic,
       String dataTopic,
-      ConsumerTaskRunner consumerTaskRunner)
+      ChannelTaskRunner channelTaskRunner)
   {
     send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
@@ -52,7 +52,7 @@ public abstract class KafkaTestUtils
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    consumerTaskRunner.executeConsumerTasks();
+    channelTaskRunner.executeChannels();
   }
 
   private static void send(
@@ -71,9 +71,4 @@ public abstract class KafkaTestUtils
         value,
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
-
-  public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
-  {
-    consumerTaskRunner.joinConsumerTasks();
-  }
 }