void putChatRoom(ChatRoom chatRoom)
{
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ Integer partition = chatRoom.getShard();
+ UUID chatRoomId = chatRoom.getId();
+ ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId);
+ if (existingChatRoom == null)
+ {
+ log.info(
+ "Creating new chat-room in partition {}: {}",
+ partition,
+ chatRoom);
+ chatrooms[partition].put(chatRoomId, chatRoom);
+ }
+ else
+ {
+ if (chatRoom.getShard() != existingChatRoom.getShard())
+ {
+ throw new IllegalArgumentException(
+ "Could not change the shard of existing chat-room " +
+ chatRoomId + " from " +
+ existingChatRoom.getShard() + " to " +
+ chatRoom.getShard());
+ }
+ else
+ {
+ log.info(
+ "Updating chat-room in partition {}: {} -> {}",
+ partition,
+ existingChatRoom,
+ chatRoom);
+ existingChatRoom.s
+ }
+ }
}
Mono<ChatRoom> getChatRoom(int shard, UUID id)
public class ChatRoomChannel implements Runnable
{
private final String topic;
- private final Consumer<String, ChatRoomTo> consumer;
- private final Producer<String, ChatRoomTo> producer;
- private final ChatRoomFactory chatRoomFactory;
+ private final Consumer<Integer, ChatRoomTo> consumer;
+ private final Producer<Integer, ChatRoomTo> producer;
+ private final ShardingStrategy shardingStrategy;
+ private final ChatMessageChannel chatMessageChannel;
private boolean running;
{
try
{
- ConsumerRecords<String, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
- for (ConsumerRecord<String, ChatRoomTo> record : records)
+ for (ConsumerRecord<Integer, ChatRoomTo> record : records)
{
UUID id = record.value().getId();
String name = record.value().getName();
}
}
+ void createChatRoom()
+ {
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
+ int shard = shardingStrategy.selectShard(id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ chatMessageChannel.putChatRoom(chatRoom);
+ }
+
Mono<ChatRoomInfo> sendCreateChatRoomRequest(
UUID chatRoomId,
String name)
{
- ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name);
+ int shard = this.shardingStrategy.selectShard(chatRoomId);
+ ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
return Mono.create(sink ->
{
- ProducerRecord<String, ChatRoomTo> record =
+ ProducerRecord<Integer, ChatRoomTo> record =
new ProducerRecord<>(
topic,
- chatRoomId.toString(),
+ shard,
chatRoomTo);
producer.send(record, ((metadata, exception) ->
package de.juplo.kafka.chat.backend.persistence.kafka;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
{
private UUID id;
private String name;
+ private int shard;
public ChatRoomInfo toChatRoomInfo()
{
- return new ChatRoomInfo(id, name, -1);
+ return new ChatRoomInfo(id, name, shard);
+ }
+
+ public static ChatRoomTo from(ChatRoom chatRoom)
+ {
+ return ChatRoomTo.of(chatRoom.getId(), chatRoom.getName(), chatRoom.getShard());
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
-import java.time.Clock;
import java.util.UUID;
@Slf4j
public class KafkaChatRoomFactory implements ChatRoomFactory
{
- private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
- private final Clock clock;
- private final ChatMessageChannel chatMessageChannel;
+ private final ChatRoomChannel chatRoomChannel;
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
- int shard = shardingStrategy.selectShard(id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatMessageChannel.putChatRoom(chatRoom);
- return Mono.just(chatRoom);
+ log.info("Sending create-request for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
}
}