From: Kai Moritz Date: Tue, 12 Sep 2023 21:28:02 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-14--22-59~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f3c228fd1d26f4d7b58383df5a64ad7b15169098;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index 85984642..a7899e43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -4,7 +4,6 @@ import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 97739b53..22c4668d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,13 +1,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; -import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -15,7 +9,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -33,6 +26,9 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final int numShards; + private final long[] startOffset; + private final long[] currentOffset; private final Map chatRoomInfo; private boolean running; @@ -50,28 +46,50 @@ public class InfoChannel implements Runnable this.consumer = infoChannelConsumer; this.producer = producer; this.chatRoomInfo = new HashMap<>(); + + this.numShards = consumer + .partitionsFor(topic) + .size(); + this.startOffset = new long[numShards]; + this.currentOffset = new long[numShards]; + IntStream + .range(0, numShards) + .forEach(partition -> this.currentOffset[partition] = -1l); + consumer + .endOffsets(consumer.assignment()) + .entrySet() + .stream() + .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); } + Mono loadInProgress() + { + return Mono + .fromSupplier(() -> IntStream + .range(0, numShards) + .anyMatch(partition -> currentOffset[partition] < startOffset[partition])); + } Mono sendChatRoomCreatedEvent( UUID chatRoomId, - String name) + String name, + int shard) { - EventChatRoomCreated to = EventChatRoomCreated.of(name); + EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard); return Mono.create(sink -> { ProducerRecord record = new ProducerRecord<>( - topic<, - chatRoomId.toString(), + topic, + Integer.toString(shard), to); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", to); + log.info("Successfully sent chreate-request for chat room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition()); createChatRoom(chatRoomInfo); sink.success(chatRoomInfo); @@ -100,28 +118,9 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadChatRoom(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + log.debug("Fetched {} messages", records.count()); + handleMessages(records); } catch (WakeupException e) { @@ -133,122 +132,36 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void loadChatRoom(ConsumerRecords records) + private void handleMessages(ConsumerRecords records) { for (ConsumerRecord record : records) { - UUID chatRoomId = UUID.fromString(record.key()); - switch (record.value().getType()) { case EVENT_CHATROOM_CREATED: - createChatRoom( - chatRoomId, - (CommandCreateChatRoomTo) record.value(), - record.partition()); - break; - - case EVENT_CHATMESSAGE_RECEIVED: - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - loadChatMessage( - chatRoomId, - timestamp, - record.offset(), - (EventChatMessageReceivedTo) record.value(), - record.partition()); + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; default: log.debug( - "Ignoring message for chat-room {} with offset {}: {}", - chatRoomId, + "Ignoring message for key={} with offset={}: {}", + record.key(), record.offset(), record.value()); } - nextOffset[record.partition()] = record.offset() + 1; + startOffset[record.partition()] = record.offset() + 1; } } - private void createChatRoom( - UUID chatRoomId, - CommandCreateChatRoomTo createChatRoomRequestTo, - Integer partition) - { - log.info( - "Loading ChatRoom {} for shard {} with buffer-size {}", - chatRoomId, - partition, - bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId); - ChatRoomData chatRoomData = new ChatRoomData( - clock, - service, - bufferSize); - putChatRoom( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, - chatRoomData); - } - - private void createChatRoom(ChatRoomInfo chatRoomInfo) { - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - - private void loadChatMessage( - UUID chatRoomId, - LocalDateTime timestamp, - long offset, - EventChatMessageReceivedTo chatMessageTo, - int partition) - { - Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); - Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + UUID chatRoomId = chatRoomInfo.getId(); + Integer partition = chatRoomInfo.getShard(); - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); - KafkaChatMessageService kafkaChatRoomService = - (KafkaChatMessageService) chatRoomData.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - - private boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); - } - - private void pauseAllOwnedPartions() - { - consumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(topic, shard)) - .toList()); - } - - - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomInfo[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo.containsKey(chatRoomId)) { log.warn( "Ignoring existing chat-room for {}: {}", @@ -258,60 +171,21 @@ public class InfoChannel implements Runnable else { log.info( - "Adding new chat-room to partition {}: {}", + "Adding new chat-room for partition {}: {}", partition, - chatRoomData); - - this.chatRoomInfo[partition].put( - chatRoomId, - new ChatRoomInfo(chatRoomId, name, partition)); - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - - int[] getOwnedShards() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .toArray(); - } - - Mono getChatRoomData(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } + chatRoomId); - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); + this.chatRoomInfo.put(chatRoomId, chatRoomInfo); } - - return Mono.justOrEmpty(chatRoomData[shard].get(id)); } Flux getChatRoomInfo() { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + return Flux.fromIterable(chatRoomInfo.values()); } - Mono getChatRoomInfo(int shard, UUID id) + Mono getChatRoomInfo(UUID id) { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 707e843b..d0b8eff7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -26,16 +26,20 @@ public class KafkaChatHomeService implements ChatHomeService @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return infoChannel.sendChatRoomCreatedEvent(id, name); + int shard = selectShard(id); + log.info( + "Sending create-command for chat rooom: id={}, name={}, shard={}", + id, + name, + shard); + return infoChannel.sendChatRoomCreatedEvent(id, name, shard); } @Override public Mono getChatRoomInfo(UUID id) { - int shard = selectShard(id); return infoChannel - .getChatRoomInfo(shard, id) + .getChatRoomInfo(id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java index b5cf458b..ae5a5019 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java @@ -1,11 +1,14 @@ package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import java.util.UUID; + @Getter @Setter @@ -24,5 +27,19 @@ public class EventChatRoomCreated extends AbstractMessageTo } - public static EventChatRoomCreated of(String ) + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static EventChatRoomCreated of(UUID id, String name, Integer shard) + { + EventChatRoomCreated event = new EventChatRoomCreated(); + + event.setId(id.toString()); + event.setName(name); + event.setShard(shard); + + return event; + } }