X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=95f7fb064b1271459102f79cd9f84dd9a245b61e;hb=8300dcd98f681893a077051560151a8f1b94e38d;hp=26e86963ec9b628bda647ca923350caedb3d1c20;hpb=8b682010e0052a5c036814e64e5eb94122ed52ce;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 26e86963..95f7fb06 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -3,6 +3,10 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; +import lombok.Getter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -13,31 +17,43 @@ import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.*; +import java.net.URI; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +@ToString(of = { "topic", "instanceUri" }) @Slf4j -public class InfoChannel implements Runnable +public class InfoChannel implements Channel { private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; + private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; + private final String instanceUri; + private final ChannelMediator channelMediator; private boolean running; + @Getter + private volatile ChannelState channelState = ChannelState.STARTING; public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + Duration pollingInterval, + int numShards, + URI instanceUri, + ChannelMediator channelMediator) { log.debug( "Creating InfoChannel for topic {}", @@ -47,24 +63,22 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); - this.numShards = consumer - .partitionsFor(topic) - .size(); + this.pollingInterval = pollingInterval; + + this.numShards = numShards; + this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = -1l); - } + this.instanceUri = instanceUri.toASCIIString(); - boolean loadInProgress() - { - return IntStream - .range(0, numShards) - .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]); + this.channelMediator = channelMediator; } + Mono sendChatRoomCreatedEvent( UUID chatRoomId, String name, @@ -81,9 +95,9 @@ public class InfoChannel implements Runnable producer.send(record, ((metadata, exception) -> { - if (metadata != null) + if (exception == null) { - log.info("Successfully sent chreate-request for chat room: {}", to); + log.info("Successfully sent created event for chat chat-room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } @@ -91,7 +105,7 @@ public class InfoChannel implements Runnable { // On send-failure log.error( - "Could not send create-request for chat room (id={}, name={}): {}", + "Could not send created event for chat-room (id={}, name={}): {}", chatRoomId, name, exception); @@ -101,6 +115,70 @@ public class InfoChannel implements Runnable }); } + void sendShardAssignedEvent(int shard) + { + EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); + + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard assigned event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + // TODO: + // Verhalten im Fehlerfall durchdenken! + // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des + // Consumers veranlassen, so dass die nicht öffentlich Bekannte + // Zuständigkeit abgegeben und neu zugeordnet wird? + // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions! + } + })); + } + + void sendShardRevokedEvent(int shard) + { + EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); + + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard revoked event for shard: {}", shard); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + // TODO: + // Verhalten im Fehlerfall durchdenken! + // Ggf. einfach egal, da die neue zuständige Instanz den + // nicht gelöschten Eintrag eh überschreibt? + } + })); + } + @Override public void run() @@ -115,18 +193,24 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = 0l); + channelState = ChannelState.LOAD_IN_PROGRESS; while (running) { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); - handleMessages(records); + for (ConsumerRecord record : records) + { + handleMessage(record); + updateNextOffset(record.partition(), record.offset() + 1); + } } catch (WakeupException e) { log.info("Received WakeupException, exiting!"); + channelState = ChannelState.SHUTTING_DOWN; running = false; } } @@ -134,27 +218,58 @@ public class InfoChannel implements Runnable log.info("Exiting normally"); } - private void handleMessages(ConsumerRecords records) + private void updateNextOffset(int partition, long nextOffset) { - for (ConsumerRecord record : records) + this.nextOffset[partition] = nextOffset; + if (channelState == ChannelState.LOAD_IN_PROGRESS) { - switch (record.value().getType()) + boolean loadInProgress = IntStream + .range(0, numShards) + .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]); + if (!loadInProgress) { - case EVENT_CHATROOM_CREATED: - EventChatRoomCreated eventChatRoomCreated = - (EventChatRoomCreated) record.value(); - createChatRoom(eventChatRoomCreated.toChatRoomInfo()); - break; - - default: - log.debug( - "Ignoring message for key={} with offset={}: {}", - record.key(), - record.offset(), - record.value()); + log.info("Loading of info completed! Resuming normal operations..."); + channelState = ChannelState.READY; } + } + } + + private void handleMessage(ConsumerRecord record) + { + switch (record.value().getType()) + { + case EVENT_CHATROOM_CREATED: + EventChatRoomCreated eventChatRoomCreated = + (EventChatRoomCreated) record.value(); + createChatRoom(eventChatRoomCreated.toChatRoomInfo()); + break; + + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; - nextOffset[record.partition()] = record.offset() + 1; + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; + break; + + default: + log.debug( + "Ignoring message for key={} with offset={}: {}", + record.key(), + record.offset(), + record.value()); } } @@ -178,6 +293,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.channelMediator.chatRoomCreated(chatRoomInfo); } } @@ -188,6 +304,17 @@ public class InfoChannel implements Runnable Mono getChatRoomInfo(UUID id) { + ChannelState capturedState = channelState; + if (capturedState != ChannelState.READY) + { + return Mono.error(new ChannelNotReadyException(capturedState)); + } + return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } + + Mono getShardOwners() + { + return Mono.just(shardOwners); + } }