From 8dc6050f497c742e19cf5363214cf452f31f0017 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 19 Apr 2023 16:59:01 +0200 Subject: [PATCH] NEU --- .../persistence/kafka/ChatRoomChannel.java | 58 ++++++------------- .../backend/persistence/kafka/ChatRoomTo.java | 10 +--- .../kafka/KafkaChatRoomFactory.java | 10 +++- .../kafka/KafkaChatRoomService.java | 2 +- 4 files changed, 27 insertions(+), 53 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 0c553e69..da04554e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -1,10 +1,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.*; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; @@ -12,48 +11,25 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.UUID; -import java.util.stream.IntStream; +@RequiredArgsConstructor @Slf4j public class ChatRoomChannel implements Runnable { private final String topic; - private final Consumer consumer; - private final Producer producer; - private final ShardingStrategy shardingStrategy; - private final ChatMessageChannel chatMessageChannel; + private final Consumer consumer; + private final Producer producer; + private final ChatRoomFactory chatRoomFactory; private boolean running; - public ChatRoomChannel( - String topic, - Consumer consumer, - Producer producer, - int numShards, - ChatMessageChannel chatMessageChannel) - { - log.debug( - "Creating ChatRoomChannel for topic {} with sharding for {} partitions", - topic, - numShards); - this.topic = topic; - this.consumer = consumer; - this.producer = producer; - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); - this.chatMessageChannel = chatMessageChannel; - } - - @Override public void run() { @@ -65,9 +41,15 @@ public class ChatRoomChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); + for (ConsumerRecord record : records) + { + UUID id = record.value().getId(); + String name = record.value().getName(); + chatRoomFactory.createChatRoom(id, name); + } } catch (WakeupException e) { @@ -78,37 +60,33 @@ public class ChatRoomChannel implements Runnable } } - Mono sendCreateChatRoomRequest( UUID chatRoomId, String name) { - int shard = this.shardingStrategy.selectShard(chatRoomId); - ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard); + ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, - shard, + chatRoomId.toString(), chatRoomTo); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request {}", chatRoomTo); + log.info("Successfully send chreate-request for chat room: {}", chatRoomTo); sink.success(chatRoomTo.toChatRoomInfo()); } else { // On send-failure log.error( - "Could not create-request for chat-room={}, key={}, timestamp={}, text={}: {}", + "Could not send create-request for chat room (id={}, name={}): {}", chatRoomId, - key, - timestamp, - text, + name, exception); sink.error(exception); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java index 3e27a403..4b85f1c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java @@ -1,12 +1,10 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.time.Clock; import java.util.UUID; @@ -17,15 +15,9 @@ public class ChatRoomTo { private UUID id; private String name; - private int shard; public ChatRoomInfo toChatRoomInfo() { - return new ChatRoomInfo(id, name, shard); - } - - public static ChatRoomTo from(ChatRoom chatRoom) - { - return ChatRoomTo.of(chatRoom.getId(), chatRoom.getName(), chatRoom.getShard()); + return new ChatRoomInfo(id, name, -1); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 23bd921d..ad9fe9f8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -16,15 +16,19 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final ChatRoomChannel chatRoomChannel; + private final int bufferSize; + private final ShardingStrategy shardingStrategy; + private final Clock clock; + private final ChatMessageChannel chatMessageChannel; @Override public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(chatRoomChannel, id); + KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id); + int shard = shardingStrategy.selectShard(id); ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatRoomChannel.putChatRoom(chatRoom); + chatMessageChannel.putChatRoom(chatRoom); return Mono.just(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 07e3fe45..09861946 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -32,7 +32,7 @@ public class KafkaChatRoomService implements ChatRoomService .doOnSuccess(message -> persistMessage(message)); } - public void persistMessage(Message message) + void persistMessage(Message message) { messages.put(message.getKey(), message); } -- 2.20.1