From 716ffa6a7665496e614ce6a1671c8e49c562a4c2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 19 Apr 2023 17:18:57 +0200 Subject: [PATCH] NEU --- .../persistence/kafka/ChatMessageChannel.java | 32 ++++++++++++++++++- .../persistence/kafka/ChatRoomChannel.java | 27 +++++++++++----- .../backend/persistence/kafka/ChatRoomTo.java | 9 +++++- .../kafka/KafkaChatRoomFactory.java | 16 ++-------- 4 files changed, 61 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 76204617..230f8226 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -249,7 +249,37 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener void putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + ChatRoom existingChatRoom = chatrooms[partition].get(chatRoomId); + if (existingChatRoom == null) + { + log.info( + "Creating new chat-room in partition {}: {}", + partition, + chatRoom); + chatrooms[partition].put(chatRoomId, chatRoom); + } + else + { + if (chatRoom.getShard() != existingChatRoom.getShard()) + { + throw new IllegalArgumentException( + "Could not change the shard of existing chat-room " + + chatRoomId + " from " + + existingChatRoom.getShard() + " to " + + chatRoom.getShard()); + } + else + { + log.info( + "Updating chat-room in partition {}: {} -> {}", + partition, + existingChatRoom, + chatRoom); + existingChatRoom.s + } + } } Mono getChatRoom(int shard, UUID id) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index da04554e..b08a50d2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -23,9 +23,10 @@ import java.util.UUID; public class ChatRoomChannel implements Runnable { private final String topic; - private final Consumer consumer; - private final Producer producer; - private final ChatRoomFactory chatRoomFactory; + private final Consumer consumer; + private final Producer producer; + private final ShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChannel; private boolean running; @@ -41,10 +42,10 @@ public class ChatRoomChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { UUID id = record.value().getId(); String name = record.value().getName(); @@ -60,17 +61,27 @@ public class ChatRoomChannel implements Runnable } } + void createChatRoom() + { + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); + int shard = shardingStrategy.selectShard(id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatMessageChannel.putChatRoom(chatRoom); + } + Mono sendCreateChatRoomRequest( UUID chatRoomId, String name) { - ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name); + int shard = this.shardingStrategy.selectShard(chatRoomId); + ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, - chatRoomId.toString(), + shard, chatRoomTo); producer.send(record, ((metadata, exception) -> diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java index 4b85f1c5..9c196b27 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.AllArgsConstructor; import lombok.Data; @@ -15,9 +16,15 @@ public class ChatRoomTo { private UUID id; private String name; + private int shard; public ChatRoomInfo toChatRoomInfo() { - return new ChatRoomInfo(id, name, -1); + return new ChatRoomInfo(id, name, shard); + } + + public static ChatRoomTo from(ChatRoom chatRoom) + { + return ChatRoomTo.of(chatRoom.getId(), chatRoom.getName(), chatRoom.getShard()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index ad9fe9f8..825f16eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -1,14 +1,11 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; -import java.time.Clock; import java.util.UUID; @@ -16,19 +13,12 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final int bufferSize; - private final ShardingStrategy shardingStrategy; - private final Clock clock; - private final ChatMessageChannel chatMessageChannel; + private final ChatRoomChannel chatRoomChannel; @Override public Mono createChatRoom(UUID id, String name) { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); - int shard = shardingStrategy.selectShard(id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatMessageChannel.putChatRoom(chatRoom); - return Mono.just(chatRoom); + log.info("Sending create-request for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } -- 2.20.1