import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
+ private final Consumer<String, MessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
public ChatMessageChannel(
String topic,
- Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
+ Consumer<String, MessageTo> consumer,
ZoneId zoneId,
int numShards)
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
- Flux<ChatRoom> getChatRooms(int shard)
+ Flux<ChatRoom> getChatRooms()
{
- return Flux.fromStream(chatrooms[shard].values().stream());
+ return Flux.fromStream(IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .mapToObj(shard -> Integer.valueOf(shard))
+ .flatMap(shard -> chatrooms[shard].values().stream()));
}
}
public class ChatRoomChannel implements Runnable
{
private final String topic;
- private final Consumer<Integer, ChatRoomTo> consumer;
private final Producer<Integer, ChatRoomTo> producer;
+ private final Consumer<Integer, ChatRoomTo> consumer;
private final ShardingStrategy shardingStrategy;
private final ChatMessageChannel chatMessageChannel;
private final Clock clock;
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+ private final ShardingStrategy shardingStrategy;
+ private final ChatMessageChannel chatMessageChanel;
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException(shard);
+ }
+ else
+ {
+ return chatMessageChanel.getChatRoom(shard, id);
+ }
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ if (chatMessageChanel.isLoadInProgress())
+ {
+ throw new LoadInProgressException();
+ }
+ else
+ {
+ return chatMessageChanel.getChatRooms();
+ }
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class KafkaChatHomeService implements ChatHomeService
-{
- private final ChatMessageChannel chatMessageChanel;
-
-
- public KafkaChatHomeService(ChatMessageChannel chatMessageChannel)
- {
- log.debug("Creating KafkaChatHomeService");
- this.chatMessageChanel = chatMessageChannel;
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
- {
- if (chatMessageChanel.isLoadInProgress())
- {
- throw new ShardNotOwnedException(shard);
- }
- else
- {
- return chatMessageChanel.getChatRoom(shard, id);
- }
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms(int shard)
- {
- if (chatMessageChanel.isLoadInProgress())
- {
- throw new ShardNotOwnedException(shard);
- }
- else
- {
- return chatMessageChanel.getChatRooms(shard);
- }
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+
+
+public class LoadInProgressException extends ShardNotOwnedException
+{
+ public LoadInProgressException()
+ {
+ this(-1);
+ }
+
+ public LoadInProgressException(int shard)
+ {
+ super(shard);
+ }
+}
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
+import java.time.ZoneId;
@ConditionalOnProperty(
{
@Bean
ChatHome kafkaChatHome(
- ChatBackendProperties properties,
- KafkaChatHomeService chatHomeService)
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel)
{
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- for (int shard = 0; shard < numShards; shard++)
- {
-
- }
- .read()
- .subscribe(chatRoom ->
- {
- int shard = chatRoom.getShard();
- if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
- });
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new KafkaChatHome(shardingStrategy, chatMessageChannel);
}
@Bean
- KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties)
+ KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
{
- ShardingStrategyType sharding =
- properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
- return new InMemoryChatHomeService(
- numShards,
- ownedShards,
- storageStrategy.read());
+ return new KafkaChatRoomFactory(chatRoomChannel);
}
@Bean
- InMemoryChatRoomFactory chatRoomFactory(
- InMemoryChatHomeService service,
- ShardingStrategy strategy,
- Clock clock,
- ChatBackendProperties properties)
+ ChatRoomChannel chatRoomChannel(
+ ChatBackendProperties properties,
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+ ShardingStrategy shardingStrategy,
+ ChatMessageChannel chatMessageChannel,
+ Clock clock)
{
- return new InMemoryChatRoomFactory(
- service,
- strategy,
+ return new ChatRoomChannel(
+ properties.getKafka().getTopic(),
+ chatRoomChannelProducer,
+ chatRoomChannelConsumer,
+ shardingStrategy,
+ chatMessageChannel,
clock,
properties.getChatroomBufferSize());
}
@Bean
- ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ ChatMessageChannel chatMessageChannel(
+ ChatBackendProperties properties,
+ Producer<String, MessageTo> chatMessageChannelProducer,
+ Consumer<String, MessageTo> chatMessageChannelConsumer,
+ ZoneId zoneId)
{
- return new KafkaLikeShardingStrategy(
+ return new ChatMessageChannel(
+ properties.getKafka().getTopic(),
+ chatMessageChannelProducer,
+ chatMessageChannelConsumer,
+ zoneId,
properties.getKafka().getNumPartitions());
}
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
+ @Bean
+ ZoneId zoneId()
+ {
+ return ZoneId.systemDefault();
+ }
}