X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FDataChannel.java;h=b4cc33f58826ed7a77d0733a69ec58dfde235018;hb=13f86063f851fc2c4ad6de56c8edb78bff9d0592;hp=d94bc659af1368565a3e14c33eb0b40583e1ff3f;hpb=58522d1ddff4795d9c924d90cd1695d6c8d30a38;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d94bc659..b4cc33f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -4,19 +4,14 @@ import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; @@ -25,98 +20,69 @@ import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Runnable, ConsumerRebalanceListener { + private final String instanceId; private final String topic; private final Producer producer; private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final Duration pollingInterval; private final int bufferSize; private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; private final Map[] chatRoomData; + private final InfoChannel infoChannel; + private final ShardingPublisherStrategy shardingPublisherStrategy; private boolean running; @Getter private volatile boolean loadInProgress; - public ChatRoomChannel( + public DataChannel( + String instanceId, String topic, Producer producer, - Consumer consumer, + Consumer dataChannelConsumer, ZoneId zoneId, int numShards, + Duration pollingInterval, int bufferSize, - Clock clock) + Clock clock, + InfoChannel infoChannel, + ShardingPublisherStrategy shardingPublisherStrategy) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", + "{}: Creating DataChannel for topic {} with {} partitions", + instanceId, topic, numShards); + this.instanceId = instanceId; this.topic = topic; - this.consumer = consumer; + this.consumer = dataChannelConsumer; this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.pollingInterval = pollingInterval; this.bufferSize = bufferSize; this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(shard -> this.chatRoomData[shard] = new HashMap<>()); + this.infoChannel = infoChannel; + this.shardingPublisherStrategy = shardingPublisherStrategy; } - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - chatRoomId.toString(), - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); - sink.success(chatRoomInfo); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, @@ -136,7 +102,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { // On successful send Message message = new Message(key, metadata.offset(), timestamp, text); @@ -178,6 +144,20 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); + shardingPublisherStrategy + .publishOwnership(partition) + .doOnSuccess(instanceId -> log.info( + "Successfully published instance {} as owner of shard {}", + instanceId, + partition)) + .doOnError(throwable -> log.error( + "Could not publish instance {} as owner of shard {}: {}", + instanceId, + partition, + throwable.toString())) + .onErrorComplete() + .block(); }); consumer.resume(partitions); @@ -190,7 +170,9 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { int partition = topicPartition.partition(); isShardOwned[partition] = false; + nextOffset[partition] = consumer.position(topicPartition); log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } @@ -211,12 +193,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(pollingInterval); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadChatRoom(records); + loadChatRoomData(records); if (isLoadingCompleted()) { @@ -244,7 +226,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void loadChatRoomData(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -252,13 +234,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); @@ -282,42 +257,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - - private void createChatRoom(ChatRoomInfo chatRoomInfo) - { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - private void loadChatMessage( UUID chatRoomId, LocalDateTime timestamp, @@ -328,10 +267,17 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); + ChatRoomData chatRoomData = this + .chatRoomData[partition] + .computeIfAbsent(chatRoomId, this::computeChatRoomData); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); + log.debug( + "Loaded message from partition={} at offset={}: {}", + partition, + offset, + message); kafkaChatRoomService.persistMessage(message); } @@ -340,7 +286,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); + .allMatch(shard -> + { + TopicPartition partition = new TopicPartition(topic, shard); + long position = consumer.position(partition); + return position >= currentOffset[shard]; + }); } private void pauseAllOwnedPartions() @@ -353,33 +304,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) - { - log.warn( - "Ignoring existing chat-room for {}: {}", - partition, - chatRoomId); - } - else - { - log.info( - "Adding new chat-room to partition {}: {}", - partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - int[] getOwnedShards() { return IntStream @@ -397,32 +321,24 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener if (!isShardOwned[shard]) { - return Mono.error(new ShardNotOwnedException(shard)); + return Mono.error(new ShardNotOwnedException(instanceId, shard)); } - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return infoChannel + .getChatRoomInfo(id) + .map(chatRoomInfo -> + chatRoomData[shard].computeIfAbsent(id, this::computeChatRoomData)); } - Flux getChatRoomInfo() + private ChatRoomData computeChatRoomData(UUID chatRoomId) { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + log.info("Creating ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); + return new ChatRoomData(clock, service, bufferSize); } - Mono getChatRoomInfo(int shard, UUID id) + ConsumerGroupMetadata getConsumerGroupMetadata() { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return consumer.groupMetadata(); } }