* Renamed attributes and method-names according to the class-renames.
* Introduced interface `Channel` and `enum ChannelState`.
* `Data` - and `InfoChannel` maintain a `ChannelState`, instead just a
plain boolean, that only reflects the loading-state.
* The `ChannelTaskRunner` waits, until both channels entered the State
`ChannelState.SHUTTING_DOWN`.
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+public interface Channel extends Runnable
+{
+ ChannelState getChannelState();
+}
-package de.juplo.kafka.chat.backend.domain.exceptions;
+package de.juplo.kafka.chat.backend.implementation.kafka;
-public class LoadInProgressException extends IllegalStateException
+public class ChannelNotReadyException extends IllegalStateException
{
- public LoadInProgressException()
+ public final ChannelState state;
+
+ public ChannelNotReadyException(ChannelState state)
{
- super("Load in progress...");
+ super("Not ready! Current state: " + state);
+ this.state = state;
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+public enum ChannelState
+{
+ STARTING,
+ LOAD_IN_PROGRESS,
+ READY,
+ SHUTTING_DOWN
+}
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskExecutor
+public class ChannelTaskExecutor
{
private final ThreadPoolTaskExecutor taskExecutor;
- private final Runnable consumerTask;
+ @Getter
+ private final Channel channel;
private final Consumer<String, AbstractMessageTo> consumer;
private final WorkAssignor workAssignor;
- CompletableFuture<Void> consumerTaskJob;
+ CompletableFuture<Void> channelTaskJob;
- public void executeConsumerTask()
+ public void executeChannelTask()
{
workAssignor.assignWork(consumer);
- log.info("Starting the consumer-task for {}", consumerTask);
- consumerTaskJob = taskExecutor
- .submitCompletable(consumerTask)
+ log.info("Starting the consumer-task for {}", channel);
+ channelTaskJob = taskExecutor
+ .submitCompletable(channel)
.exceptionally(e ->
{
- log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+ log.error("The consumer-task for {} exited abnormally!", channel, e);
return null;
});
}
@PreDestroy
- public void joinConsumerTaskJob()
+ public void join()
{
- log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+ log.info("Signaling the consumer-task for {} to quit its work", channel);
consumer.wakeup();
- log.info("Waiting for the consumer of {} to finish its work", consumerTask);
- consumerTaskJob.join();
- log.info("Joined the consumer-task for {}", consumerTask);
+ log.info("Waiting for the consumer of {} to finish its work", channel);
+ channelTaskJob.join();
+ log.info("Joined the consumer-task for {}", channel);
}
}
@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskRunner
+public class ChannelTaskRunner
{
- private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
- private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
- private final InfoChannel infoChannel;
+ private final ChannelTaskExecutor infoChannelTaskExecutor;
+ private final ChannelTaskExecutor dataChannelTaskExecutor;
- public void executeConsumerTasks()
+ public void executeChannels()
{
- infoChannelConsumerTaskExecutor.executeConsumerTask();
- dataChannelConsumerTaskExecutor.executeConsumerTask();
+ infoChannelTaskExecutor.executeChannelTask();
+ dataChannelTaskExecutor.executeChannelTask();
}
- public void joinConsumerTasks() throws InterruptedException
+ public void joinChannels() throws InterruptedException
{
- dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
- while (infoChannel.isLoadInProgress())
+ joinChannel(dataChannelTaskExecutor);
+ joinChannel(infoChannelTaskExecutor);
+ }
+
+ private void joinChannel(
+ ChannelTaskExecutor channelTaskExecutor)
+ throws InterruptedException
+ {
+ Channel channel = channelTaskExecutor.getChannel();
+ while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
{
- log.info("Waiting for {} to finish loading...", infoChannel);
+ log.info("Waiting for {} to shut down...", channel);
Thread.sleep(1000);
}
- infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ channelTaskExecutor.join();
}
}
package de.juplo.kafka.chat.backend.implementation.kafka;
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.Producer;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.IntStream;
+@ToString(of = { "topic", "instanceId" })
@Slf4j
-public class DataChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Channel, ConsumerRebalanceListener
{
private final String instanceId;
private final String topic;
private boolean running;
@Getter
- private volatile boolean loadInProgress;
+ private volatile ChannelState channelState = ChannelState.STARTING;
public DataChannel(
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
log.info("Newly assigned partitions! Pausing normal operations...");
- loadInProgress = true;
+ channelState = ChannelState.LOAD_IN_PROGRESS;
consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.info("Fetched {} messages", records.count());
- if (loadInProgress)
+ switch (channelState)
{
- loadChatRoomData(records);
-
- if (isLoadingCompleted())
+ case LOAD_IN_PROGRESS ->
{
- log.info("Loading of messages completed! Pausing all owned partitions...");
- pauseAllOwnedPartions();
- log.info("Resuming normal operations...");
- loadInProgress = false;
+ loadChatRoomData(records);
+
+ if (isLoadingCompleted())
+ {
+ log.info("Loading of messages completed! Pausing all owned partitions...");
+ pauseAllOwnedPartions();
+ log.info("Resuming normal operations...");
+ channelState = ChannelState.READY;
+ }
}
- }
- else
- {
- if (!records.isEmpty())
+ case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count());
+ default ->
{
- throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+ if (!records.isEmpty())
+ {
+ throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState);
+ }
}
}
}
catch (WakeupException e)
{
log.info("Received WakeupException, exiting!");
+ channelState = ChannelState.SHUTTING_DOWN;
running = false;
}
}
Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
{
- if (loadInProgress)
+ ChannelState capturedState = channelState;
+ if (capturedState != ChannelState.READY)
{
- return Mono.error(new LoadInProgressException());
+ return Mono.error(new ChannelNotReadyException(capturedState));
}
if (!isShardOwned[shard])
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
-import java.time.*;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.IntStream;
+@ToString(of = { "topic", "instanceUri" })
@Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private boolean running;
@Getter
- private volatile boolean loadInProgress = true;
+ private volatile ChannelState channelState = ChannelState.STARTING;
public InfoChannel(
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = 0l);
- loadInProgress = true;
+ channelState = ChannelState.LOAD_IN_PROGRESS;
while (running)
{
catch (WakeupException e)
{
log.info("Received WakeupException, exiting!");
+ channelState = ChannelState.SHUTTING_DOWN;
running = false;
}
}
private void updateNextOffset(int partition, long nextOffset)
{
this.nextOffset[partition] = nextOffset;
- if (loadInProgress) {
- loadInProgress = IntStream
+ if (channelState == ChannelState.LOAD_IN_PROGRESS)
+ {
+ boolean loadInProgress = IntStream
.range(0, numShards)
.anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
+ if (!loadInProgress)
+ {
+ channelState = ChannelState.READY;
+ }
}
}
Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
- if (loadInProgress)
+ ChannelState capturedState = channelState;
+ if (capturedState != ChannelState.READY)
{
- return Mono.error(new LoadInProgressException());
+ return Mono.error(new ChannelNotReadyException(capturedState));
}
return Mono.fromSupplier(() -> chatRoomInfo.get(id));
@RequiredArgsConstructor
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- private final ConsumerTaskRunner consumerTaskRunner;
+ private final ChannelTaskRunner channelTaskRunner;
@Override
public void run(ApplicationArguments args)
{
- consumerTaskRunner.executeConsumerTasks();
+ channelTaskRunner.executeChannels();
}
@PreDestroy
- public void joinConsumerTasks() throws InterruptedException
+ public void joinChannels() throws InterruptedException
{
- consumerTaskRunner.joinConsumerTasks();
+ channelTaskRunner.joinChannels();
}
}
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
public class KafkaServicesConfiguration
{
@Bean
- ConsumerTaskRunner consumerTaskRunner(
- ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
- InfoChannel infoChannel)
+ ChannelTaskRunner channelTaskRunner(
+ ChannelTaskExecutor infoChannelTaskExecutor,
+ ChannelTaskExecutor dataChannelTaskExecutor)
{
- return new ConsumerTaskRunner(
- infoChannelConsumerTaskExecutor,
- dataChannelConsumerTaskExecutor,
- infoChannel);
+ return new ChannelTaskRunner(
+ infoChannelTaskExecutor,
+ dataChannelTaskExecutor);
}
@Bean
- ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+ ChannelTaskExecutor infoChannelTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
WorkAssignor infoChannelWorkAssignor)
{
- return new ConsumerTaskExecutor(
+ return new ChannelTaskExecutor(
taskExecutor,
infoChannel,
infoChannelConsumer,
}
@Bean
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+ ChannelTaskExecutor dataChannelTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
WorkAssignor dataChannelWorkAssignor)
{
- return new ConsumerTaskExecutor(
+ return new ChannelTaskExecutor(
taskExecutor,
dataChannel,
dataChannelConsumer,
import de.juplo.kafka.chat.backend.implementation.kafka.*;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
@BeforeAll
public static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- consumerTaskRunner);
+ channelTaskRunner);
}
@AfterAll
- static void joinConsumerTasks(
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinChannels(
+ @Autowired Consumer dataChannelConsumer,
+ @Autowired Consumer infoChannelConsumer,
+ @Autowired ChannelTaskRunner channelTaskRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+ dataChannelConsumer.wakeup();
+ infoChannelConsumer.wakeup();
+ channelTaskRunner.joinChannels();
}
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import de.juplo.kafka.chat.backend.implementation.inmemory.InMemoryServicesConfiguration;
import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesConfiguration;
.log("testGetExistingChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).emitsCount(1);
.log("testGetNonExistentChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).sendsError(e ->
package de.juplo.kafka.chat.backend.domain;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.implementation.kafka.ChannelNotReadyException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
- .filter(throwable -> throwable instanceof LoadInProgressException));
+ .filter(throwable -> throwable instanceof ChannelNotReadyException));
// Then
assertThat(mono).sendsError(e ->
import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
@BeforeAll
static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- consumerTaskRunner);
+ channelTaskRunner);
}
@AfterAll
- static void joinConsumerTasks(
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinChannels(
+ @Autowired Consumer dataChannelConsumer,
+ @Autowired Consumer infoChannelConsumer,
+ @Autowired ChannelTaskRunner channelTaskRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+ dataChannelConsumer.wakeup();
+ infoChannelConsumer.wakeup();
+ channelTaskRunner.joinChannels();
}
}
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic,
- ConsumerTaskRunner consumerTaskRunner)
+ ChannelTaskRunner channelTaskRunner)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- consumerTaskRunner.executeConsumerTasks();
+ channelTaskRunner.executeChannels();
}
private static void send(
value,
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
-
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
- {
- consumerTaskRunner.joinConsumerTasks();
- }
}