X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=f08a8d853c8bd0de3f53ec5206c601d263adea44;hb=f5374ebf5f6d28fae6aa2457d56cef1f8ee88f87;hp=d94bc659af1368565a3e14c33eb0b40583e1ff3f;hpb=6500199fa457b2c94497952ea0aaf8be6de5cdda;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index d94bc659..f08a8d85 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,113 +1,108 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import java.net.URI; import java.time.*; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable { private final String topic; private final Producer producer; private final Consumer consumer; - private final ZoneId zoneId; private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; + private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; + private final Map chatRoomInfo; + private final String instanceUri; private boolean running; - @Getter - private volatile boolean loadInProgress; - public ChatRoomChannel( + public InfoChannel( String topic, Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) + Consumer infoChannelConsumer, + URI instanceUri) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); + "Creating InfoChannel for topic {}", + topic); this.topic = topic; - this.consumer = consumer; + this.consumer = infoChannelConsumer; this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; + this.chatRoomInfo = new HashMap<>(); + + this.numShards = consumer + .partitionsFor(topic) + .size(); + this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(partition -> this.nextOffset[partition] = -1l); + + this.instanceUri = instanceUri.toASCIIString(); } + boolean loadInProgress() + { + return IntStream + .range(0, numShards) + .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); + } - Mono sendCreateChatRoomRequest( + Mono sendChatRoomCreatedEvent( UUID chatRoomId, - String name) + String name, + int shard) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( topic, - chatRoomId.toString(), - createChatRoomRequestTo); + Integer.toString(shard), + to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); + log.info("Successfully sent created event for chat chat-room: {}", to); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } else { // On send-failure log.error( - "Could not send create-request for chat room (id={}, name={}): {}", + "Could not send created event for chat-room (id={}, name={}): {}", chatRoomId, name, exception); @@ -117,122 +112,94 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener }); } - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) + Mono sendShardAssignedEvent(int shard) { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); + EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } + Sinks.One sink = Sinks.unsafe().one(); - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + producer.send(record, ((metadata, exception) -> { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); + if (metadata != null) + { + log.info("Successfully sent shard assigned event for shard: {}", shard); + sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST); + } + })); - consumer.resume(partitions); + return sink.asMono(); } - @Override - public void onPartitionsRevoked(Collection partitions) + Mono sendShardRevokedEvent(int shard) { - partitions.forEach(topicPartition -> + EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); + + Sinks.One sink = Sinks.unsafe().one(); + + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } + if (metadata != null) + { + log.info("Successfully sent shard revoked event for shard: {}", shard); + sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST); + } + })); - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); + return sink.asMono(); } + @Override public void run() { running = true; + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue()); + IntStream + .range(0, numShards) + .forEach(partition -> this.nextOffset[partition] = 0l); + while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + log.debug("Fetched {} messages", records.count()); + handleMessages(records); } catch (WakeupException e) { @@ -244,36 +211,42 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void handleMessages(ConsumerRecords records) { for (ConsumerRecord record : records) { - UUID chatRoomId = UUID.fromString(record.key()); - switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; break; default: log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, + "Ignoring message for key={} with offset={}: {}", + record.key(), record.offset(), record.value()); } @@ -282,84 +255,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - private void createChatRoom(ChatRoomInfo chatRoomInfo) { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatMessageService kafkaChatRoomService = - (KafkaChatMessageService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } + UUID chatRoomId = chatRoomInfo.getId(); + Integer partition = chatRoomInfo.getShard(); - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } - - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo.containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -369,60 +270,26 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener else { log.info( - "Adding new chat-room to partition {}: {}", + "Adding new chat-room for partition {}: {}", partition, - chatRoomData); + chatRoomId); - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); + this.chatRoomInfo.put(chatRoomId, chatRoomInfo); } } - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) + Flux getChatRoomInfo() { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); + return Flux.fromIterable(chatRoomInfo.values()); } - Flux getChatRoomInfo() + Mono getChatRoomInfo(UUID id) { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } - Mono getChatRoomInfo(int shard, UUID id) + Mono getShardOwners() { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return Mono.just(shardOwners); } }