import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.net.URI;
import java.nio.file.Paths;
{
private ShardingStrategyType shardingStrategy = ShardingStrategyType.none;
private int numShards = 1;
- private int[] ownedShards = new int[] { 0 };
+ private int[] ownedShards = new int[0];
+ private URI[] shardOwners = new URI[0];
private StorageStrategyType storageStrategy = StorageStrategyType.none;
private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
}
@Setter
public static class KafkaServicesProperties
{
+ private URI instanceUri = URI.create("http://localhost:8080");
private String clientIdPrefix;
private String bootstrapServers = ":9092";
private String infoChannelTopic = "info_channel";
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.URI;
import java.time.Clock;
import java.util.*;
private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
private final int bufferSize;
+ @Getter
+ private final URI instanceUri;
public SimpleChatHomeService(
StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int bufferSize,
+ URI instanceUri)
{
this(
null,
storageStrategy,
clock,
- bufferSize);
+ bufferSize,
+ instanceUri);
}
public SimpleChatHomeService(
Integer shard,
StorageStrategy storageStrategy,
Clock clock,
- int bufferSize)
+ int bufferSize,
+ URI instanceUri)
{
log.info("Created SimpleChatHome for shard {}", shard);
;
});
this.clock = clock;
this.bufferSize = bufferSize;
+ this.instanceUri = instanceUri;
}
.justOrEmpty(chatRoomData.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
+
+ @Override
+ public Mono<String[]> getShardOwners()
+ {
+ return Mono.empty();
+ }
}
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.URI;
import java.time.*;
import java.util.HashMap;
import java.util.Map;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final String instanceUri;
private boolean running;
public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
- Consumer<String, AbstractMessageTo> infoChannelConsumer)
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ URI instanceUri)
{
log.debug(
"Creating InfoChannel for topic {}",
IntStream
.range(0, numShards)
.forEach(partition -> this.nextOffset[partition] = -1l);
+
+ this.instanceUri = instanceUri.toASCIIString();
}
{
if (metadata != null)
{
- log.info("Successfully sent chreate-request for chat room: {}", to);
+ log.info("Successfully sent created event for chat chat-room: {}", to);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
sink.success(chatRoomInfo);
}
{
// On send-failure
log.error(
- "Could not send create-request for chat room (id={}, name={}): {}",
+ "Could not send created event for chat-room (id={}, name={}): {}",
chatRoomId,
name,
exception);
});
}
+ Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+ {
+ EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully sent shard assigned event for shard: {}", shard);
+ sink.success(metadata);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard assigned event for shard {}: {}",
+ shard,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+ {
+ EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, AbstractMessageTo> record =
+ new ProducerRecord<>(
+ topic,
+ Integer.toString(shard),
+ to);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully sent shard revoked event for shard: {}", shard);
+ sink.success(metadata);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send shard revoked event for shard {}: {}",
+ shard,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
@Override
public void run()
createChatRoom(eventChatRoomCreated.toChatRoomInfo());
break;
+ case EVENT_SHARD_ASSIGNED:
+ EventShardAssigned eventShardAssigned =
+ (EventShardAssigned) record.value();
+ break;
+
+ case EVENT_SHARD_REVOKED:
+ EventShardRevoked eventShardRevoked =
+ (EventShardRevoked) record.value();
+ break;
+
default:
log.debug(
"Ignoring message for key={} with offset={}: {}",