X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=42f17d14b62f1c9d2ffe70788e2affbe9e794a99;hb=132c1d0092fc8377d92a4ded7ef349d858ae92cd;hp=da906631be6469fa8902d2434645cdb726c4309f;hpb=a266c6d59d686df95b09d1ede931df508ab5de69;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index da906631..42f17d14 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -1,16 +1,16 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -18,52 +18,64 @@ import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; -import java.util.function.Function; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceId" }) @Slf4j -public class DataChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Channel, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; private final ZoneId zoneId; private final int numShards; - private final int bufferSize; + private final Duration pollingInterval; + private final int historyLimit; private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatRoomData; - private final InfoChannel infoChannel; + private final ChannelMediator channelMediator; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter - private volatile boolean loadInProgress; + private volatile ChannelState channelState = ChannelState.STARTING; public DataChannel( + String instanceId, String topic, Producer producer, Consumer dataChannelConsumer, ZoneId zoneId, int numShards, - int bufferSize, + Duration pollingInterval, + int historyLimit, Clock clock, - InfoChannel infoChannel) + ChannelMediator channelMediator, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating DataChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; this.consumer = dataChannelConsumer; this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; - this.bufferSize = bufferSize; + this.pollingInterval = pollingInterval; + this.historyLimit = historyLimit; this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; @@ -72,7 +84,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener IntStream .range(0, numShards) .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); - this.infoChannel = infoChannel; + this.channelMediator = channelMediator; + this.shardingPublisherStrategy = shardingPublisherStrategy; } @@ -96,7 +109,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); @@ -123,7 +136,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener public void onPartitionsAssigned(Collection partitions) { log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; + channelState = ChannelState.LOAD_IN_PROGRESS; consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> { @@ -138,6 +151,20 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + channelMediator.shardAssigned(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", + instanceId, + partition)) + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable.toString())) + .onErrorComplete() + .block(); }); consumer.resume(partitions); @@ -150,7 +177,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { int partition = topicPartition.partition(); isShardOwned[partition] = false; + nextOffset[partition] = consumer.position(topicPartition); + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + + chatRoomData[partition] + .values() + .forEach(chatRoomData -> chatRoomData.deactivate()); + + channelMediator.shardRevoked(partition); }); } @@ -171,32 +206,38 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); - if (loadInProgress) + switch (channelState) { - loadChatRoomData(records); - - if (isLoadingCompleted()) + case LOAD_IN_PROGRESS -> { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; + loadChatRoomData(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + activateAllOwnedChatRooms(); + log.info("Resuming normal operations..."); + channelState = ChannelState.READY; + } } - } - else - { - if (!records.isEmpty()) + case SHUTTING_DOWN -> log.info("Shutdown in progress: ignoring {} fetched messages.", records.count()); + default -> { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when in state " + channelState); + } } } } catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -245,12 +286,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this - .chatRoomData[partition] - .computeIfAbsent(chatRoomId, this::computeChatRoomData); + ChatRoomData chatRoomData = computeChatRoomData(chatRoomId, partition); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); + log.debug( + "Loaded message from partition={} at offset={}: {}", + partition, + offset, + message); kafkaChatRoomService.persistMessage(message); } @@ -259,7 +303,12 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + .allMatch(shard -> + { + TopicPartition partition = new TopicPartition(topic, shard); + long position = consumer.position(partition); + return position >= currentOffset[shard]; + }); } private void pauseAllOwnedPartions() @@ -271,6 +320,16 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .toList()); } + private void activateAllOwnedChatRooms() + { + IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .forEach(shard -> chatRoomData[shard] + .values() + .forEach(chatRoomData -> chatRoomData.activate())); + } + int[] getOwnedShards() { @@ -280,28 +339,52 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener .toArray(); } + void createChatRoomData(ChatRoomInfo chatRoomInfo) + { + computeChatRoomData( + chatRoomInfo.getId(), + chatRoomInfo.getShard()); + } + Mono getChatRoomData(int shard, UUID id) { - if (loadInProgress) + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) { - return Mono.error(new LoadInProgressException()); + return Mono.error(new ChannelNotReadyException(capturedState)); } if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); + } + + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + private ChatRoomData computeChatRoomData(UUID chatRoomId, int shard) + { + ChatRoomData chatRoomData = this.chatRoomData[shard].get(chatRoomId); + + if (chatRoomData != null) + { + log.info( + "Ignoring request to create already existing ChatRoomData for {}", + chatRoomId); + } + else + { + log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + chatRoomData = new ChatRoomData(clock, service, historyLimit); + this.chatRoomData[shard].put(chatRoomId, chatRoomData); } - return infoChannel - .getChatRoomInfo(id) - .map(chatRoomInfo -> - chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); + return chatRoomData; } - private ChatRoomData computeChatRoomData(UUID chatRoomId) + ConsumerGroupMetadata getConsumerGroupMetadata() { - log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - return new ChatRoomData(clock, service, bufferSize); + return consumer.groupMetadata(); } }