X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=26e86963ec9b628bda647ca923350caedb3d1c20;hb=9b0879d7ef3a4811ef48fb9190558937a07f1194;hp=d94bc659af1368565a3e14c33eb0b40583e1ff3f;hpb=6500199fa457b2c94497952ea0aaf8be6de5cdda;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index d94bc659..26e86963 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,106 +1,90 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo; -import lombok.Getter; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable { private final String topic; private final Producer producer; private final Consumer consumer; - private final ZoneId zoneId; private final int numShards; - private final int bufferSize; - private final Clock clock; - private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; - private final Map[] chatRoomData; + private final Map chatRoomInfo; private boolean running; - @Getter - private volatile boolean loadInProgress; - public ChatRoomChannel( + public InfoChannel( String topic, Producer producer, - Consumer consumer, - ZoneId zoneId, - int numShards, - int bufferSize, - Clock clock) + Consumer infoChannelConsumer) { log.debug( - "Creating ChatRoomChannel for topic {} with {} partitions", - topic, - numShards); + "Creating InfoChannel for topic {}", + topic); this.topic = topic; - this.consumer = consumer; + this.consumer = infoChannelConsumer; this.producer = producer; - this.zoneId = zoneId; - this.numShards = numShards; - this.bufferSize = bufferSize; - this.clock = clock; - this.isShardOwned = new boolean[numShards]; + this.chatRoomInfo = new HashMap<>(); + + this.numShards = consumer + .partitionsFor(topic) + .size(); this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; - this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> - { - this.chatRoomInfo[shard] = new HashMap<>(); - this.chatRoomData[shard] = new HashMap<>(); - }); + .forEach(partition -> this.nextOffset[partition] = -1l); } + boolean loadInProgress() + { + return IntStream + .range(0, numShards) + .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); + } - Mono sendCreateChatRoomRequest( + Mono sendChatRoomCreatedEvent( UUID chatRoomId, - String name) + String name, + int shard) { - CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( topic, - chatRoomId.toString(), - createChatRoomRequestTo); + Integer.toString(shard), + to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); - createChatRoom(chatRoomInfo); + log.info("Successfully sent chreate-request for chat room: {}", to); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } else @@ -117,122 +101,28 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener }); } - Mono sendChatMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - null, - zdt.toEpochSecond(), - chatRoomId.toString(), - EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - consumer.seek(topicPartition, nextOffset[partition]); - }); - - consumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } @Override public void run() { running = true; + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue()); + IntStream + .range(0, numShards) + .forEach(partition -> this.nextOffset[partition] = 0l); + while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + log.debug("Fetched {} messages", records.count()); + handleMessages(records); } catch (WakeupException e) { @@ -244,36 +134,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void handleMessages(ConsumerRecords records) { for (ConsumerRecord record : records) { - UUID chatRoomId = UUID.fromString(record.key()); - switch (record.value().getType()) { - case COMMAND_CREATE_CHATROOM: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; default: log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, + "Ignoring message for key={} with offset={}: {}", + record.key(), record.offset(), record.value()); } @@ -282,84 +158,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - private void createChatRoom(ChatRoomInfo chatRoomInfo) { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatMessageService kafkaChatRoomService = - (KafkaChatMessageService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } + UUID chatRoomId = chatRoomInfo.getId(); + Integer partition = chatRoomInfo.getShard(); - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo.containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -369,60 +173,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener else { log.info( - "Adding new chat-room to partition {}: {}", + "Adding new chat-room for partition {}: {}", partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } + chatRoomId); - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); + this.chatRoomInfo.put(chatRoomId, chatRoomInfo); } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); } Flux getChatRoomInfo() { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + return Flux.fromIterable(chatRoomInfo.values()); } - Mono getChatRoomInfo(int shard, UUID id) + Mono getChatRoomInfo(UUID id) { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } }