--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+
+public interface ShardingPublisherStrategy
+{
+ Mono<String> publishOwnership(int shard);
+}
private final long[] nextOffset;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private final InfoChannel infoChannel;
+ private final ShardingPublisherStrategy shardingPublisherStrategy;
private boolean running;
@Getter
int numShards,
int bufferSize,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
log.debug(
"{}: Creating DataChannel for topic {} with {} partitions",
.range(0, numShards)
.forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
this.infoChannel = infoChannel;
+ this.shardingPublisherStrategy = shardingPublisherStrategy;
}
consumer.seek(topicPartition, nextOffset[partition]);
infoChannel.sendShardAssignedEvent(partition);
+ shardingPublisherStrategy
+ .publishOwnership(partition)
+ .doOnNext(instanceId -> log.info(
+ "Instance {} was published as owner of shard {}",
+ instanceId,
+ partition))
+ .subscribe();
});
consumer.resume(partitions);
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import reactor.core.publisher.Mono;
import java.time.Clock;
import java.time.ZoneId;
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
Clock clock,
- InfoChannel infoChannel)
+ InfoChannel infoChannel,
+ ShardingPublisherStrategy shardingPublisherStrategy)
{
return new DataChannel(
properties.getInstanceId(),
properties.getKafka().getNumPartitions(),
properties.getChatroomBufferSize(),
clock,
- infoChannel);
+ infoChannel,
+ shardingPublisherStrategy);
}
@Bean
return properties;
}
+ @Bean
+ ShardingPublisherStrategy shardingPublisherStrategy()
+ {
+ return new ShardingPublisherStrategy() {
+ @Override
+ public Mono<String> publishOwnership(int shard)
+ {
+ return Mono.just(Integer.toString(shard));
+ }
+ };
+ }
+
@Bean
ZoneId zoneId()
{
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
+import reactor.core.publisher.Mono;
import java.time.Clock;
import java.util.List;
@Import(KafkaServicesConfiguration.class)
public static class KafkaTestConfiguration
{
+ @Bean
+ public ShardingPublisherStrategy shardingPublisherStrategy()
+ {
+ return shard -> Mono.just("MOCKED!");
+ }
+
@Bean
public WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,