X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=4711dbd9c1920ddd0e05306231e773a00c9e4050;hb=8d997cc65763b3f12fb680da67f471590e6eeeb2;hp=26e86963ec9b628bda647ca923350caedb3d1c20;hpb=efb070ce6e1e7ea1bb3297147b4e5a4bee3967cd;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 26e86963..4711dbd9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.time.*; import java.util.HashMap; import java.util.Map; @@ -27,9 +31,11 @@ public class InfoChannel implements Runnable private final Producer producer; private final Consumer consumer; private final int numShards; + private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; + private final String instanceUri; private boolean running; @@ -37,7 +43,8 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + URI instanceUri) { log.debug( "Creating InfoChannel for topic {}", @@ -50,11 +57,14 @@ public class InfoChannel implements Runnable this.numShards = consumer .partitionsFor(topic) .size(); + this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = -1l); + + this.instanceUri = instanceUri.toASCIIString(); } @@ -83,7 +93,7 @@ public class InfoChannel implements Runnable { if (metadata != null) { - log.info("Successfully sent chreate-request for chat room: {}", to); + log.info("Successfully sent created event for chat chat-room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } @@ -91,7 +101,7 @@ public class InfoChannel implements Runnable { // On send-failure log.error( - "Could not send create-request for chat room (id={}, name={}): {}", + "Could not send created event for chat-room (id={}, name={}): {}", chatRoomId, name, exception); @@ -101,6 +111,70 @@ public class InfoChannel implements Runnable }); } + Mono sendShardAssignedEvent(int shard) + { + EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard assigned event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendShardRevokedEvent(int shard) + { + EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard revoked event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + @Override public void run() @@ -146,6 +220,26 @@ public class InfoChannel implements Runnable createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; + break; + default: log.debug( "Ignoring message for key={} with offset={}: {}", @@ -190,4 +284,9 @@ public class InfoChannel implements Runnable { return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } + + Mono getShardOwners() + { + return Mono.just(shardOwners); + } }