From 13c51b4630177e7f6649500a3d4b876a12509af6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Apr 2023 20:00:31 +0200 Subject: [PATCH] NEU --- .../chat/backend/domain/ChatRoomFactory.java | 2 +- .../inmemory/InMemoryChatRoomFactory.java | 2 +- .../persistence/kafka/ChatMessageChannel.java | 264 ++++++++++++++++++ .../persistence/kafka/ChatRoomChannel.java | 118 ++++++++ .../backend/persistence/kafka/ChatRoomTo.java | 8 +- .../kafka/KafkaChatHomeService.java | 247 +--------------- .../kafka/KafkaChatRoomFactory.java | 15 +- .../kafka/KafkaChatRoomService.java | 8 +- 8 files changed, 404 insertions(+), 260 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 9872ccb1..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -21,7 +21,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java new file mode 100644 index 00000000..76204617 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -0,0 +1,264 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener +{ + private final String topic; + private final Consumer consumer; + private final Producer producer; + private final ZoneId zoneId; + private final int numShards; + private final boolean[] isShardOwned; + private final long[] currentOffset; + private final long[] nextOffset; + private final Map[] chatrooms; + private final KafkaLikeShardingStrategy shardingStrategy; + + private boolean running; + @Getter + private volatile boolean loadInProgress; + + + public ChatMessageChannel( + String topic, + Consumer consumer, + Producer producer, + ZoneId zoneId, + int numShards) + { + log.debug( + "Creating ChatMessageChannel for topic {} with {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.zoneId = zoneId; + this.numShards = numShards; + this.isShardOwned = new boolean[numShards]; + this.currentOffset = new long[numShards]; + this.nextOffset = new long[numShards]; + this.chatrooms = new Map[numShards]; + this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Newly assigned partitions! Pausing normal operations..."); + loadInProgress = true; + + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = true; + this.currentOffset[partition] = currentOffset; + + log.info( + "Partition assigned: {} - loading messages: next={} -> current={}", + partition, + nextOffset[partition], + currentOffset); + + consumer.seek(topicPartition, nextOffset[partition]); + }); + + consumer.resume(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + int partition = topicPartition.partition(); + isShardOwned[partition] = false; + log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + }); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.warn("Lost partitions: {}, partitions"); + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public void run() + { + consumer.subscribe(List.of(topic)); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + if (loadInProgress) + { + loadMessages(records); + + if (isLoadingCompleted()) + { + log.info("Loading of messages completed! Pausing all owned partitions..."); + pauseAllOwnedPartions(); + log.info("Resuming normal operations..."); + loadInProgress = false; + } + } + else + { + if (!records.isEmpty()) + { + throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); + } + } + } + catch (WakeupException e) + { + } + catch (RecordDeserializationException e) + { + } + } + } + + void loadMessages(ConsumerRecords records) + { + for (ConsumerRecord record : records) + { + nextOffset[record.partition()] = record.offset() + 1; + UUID chatRoomId = UUID.fromString(record.key()); + MessageTo messageTo = record.value(); + + Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + + Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + + ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); + if (chatRoom == null) + { + // Alles pausieren und erst von putChatRoom wieder resumen lassen! + } + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + } + + boolean isLoadingCompleted() + { + return IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) + .collect( + () -> Boolean.TRUE, // TODO: Boolean is immutable + (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable + (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable + } + + void pauseAllOwnedPartions() + { + consumer.pause(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> new TopicPartition(topic, shard)) + .toList()); + } + + Mono sendMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + TopicPartition tp = new TopicPartition(topic, shard); + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + zdt.toEpochSecond(), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + + + void putChatRoom(ChatRoom chatRoom) + { + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + } + + Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + Flux getChatRooms(int shard) + { + return Flux.fromStream(chatrooms[shard].values().stream()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java new file mode 100644 index 00000000..0c553e69 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -0,0 +1,118 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.WakeupException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + + +@Slf4j +public class ChatRoomChannel implements Runnable +{ + private final String topic; + private final Consumer consumer; + private final Producer producer; + private final ShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChannel; + + private boolean running; + + + public ChatRoomChannel( + String topic, + Consumer consumer, + Producer producer, + int numShards, + ChatMessageChannel chatMessageChannel) + { + log.debug( + "Creating ChatRoomChannel for topic {} with sharding for {} partitions", + topic, + numShards); + this.topic = topic; + this.consumer = consumer; + this.producer = producer; + this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); + this.chatMessageChannel = chatMessageChannel; + } + + + @Override + public void run() + { + consumer.assign(List.of(new TopicPartition(topic, 0))); + + running = true; + + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + log.info("Fetched {} messages", records.count()); + + } + catch (WakeupException e) + { + } + catch (RecordDeserializationException e) + { + } + } + } + + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + shard, + chatRoomTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request {}", chatRoomTo); + sink.success(chatRoomTo.toChatRoomInfo()); + } + else + { + // On send-failure + log.error( + "Could not create-request for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java index 9aa60947..3e27a403 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -18,12 +19,9 @@ public class ChatRoomTo private String name; private int shard; - public ChatRoom toChatRoom( - Clock clock, - KafkaChatRoomService service, - int bufferSize) + public ChatRoomInfo toChatRoomInfo() { - return new ChatRoom(id, name, shard, clock, service, bufferSize); + return new ChatRoomInfo(id, name, shard); } public static ChatRoomTo from(ChatRoom chatRoom) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index e23f08db..38aecd18 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -2,281 +2,50 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.*; import java.util.*; -import java.util.stream.IntStream; @Slf4j -public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener +public class KafkaChatHomeService implements ChatHomeService { - private final String chatRoomsTopic; - private final Consumer chatRoomsConsumer; - private final Producer chatRoomsProducer; - private final String chatMessagesTopic; - private final Consumer chatMessagesConsumer; - private final Producer chatMessagesProducer; - private final ZoneId zoneId; - private final int numShards; - private final boolean[] isShardOwned; - private final long[] currentOffset; - private final long[] nextOffset; - private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChanel; - private boolean running; - private volatile boolean loadInProgress; - - public KafkaChatHomeService( - String chatRoomsTopic, - Consumer chatRoomsConsumer, - Producer chatRoomsProducer, - String chatMessagesTopic, - Consumer chatMessagesConsumer, - Producer chatMessagesProducer, - ZoneId zoneId, - int numShards) + public KafkaChatHomeService(ChatMessageChannel chatMessageChannel) { log.debug("Creating KafkaChatHomeService"); - this.chatRoomsTopic = chatRoomsTopic; - this.chatRoomsConsumer = chatRoomsConsumer; - this.chatRoomsProducer = chatRoomsProducer; - this.chatMessagesTopic = chatMessagesTopic; - this.chatMessagesConsumer = chatMessagesConsumer; - this.chatMessagesProducer = chatMessagesProducer; - this.zoneId = zoneId; - this.numShards = numShards; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; - this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); - } - - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - chatMessagesConsumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - chatMessagesConsumer.seek(topicPartition, nextOffset[partition]); - }); - - chatMessagesConsumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); + this.chatMessageChanel = chatMessageChannel; } - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public void run() - { - chatMessagesConsumer.subscribe(List.of(chatMessagesTopic)); - - running = true; - - while (running) - { - try - { - ConsumerRecords records = chatMessagesConsumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadMessages(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } - } - catch (WakeupException e) - { - } - catch (RecordDeserializationException e) - { - } - } - } - - void loadMessages(ConsumerRecords records) - { - for (ConsumerRecord record : records) - { - nextOffset[record.partition()] = record.offset() + 1; - UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); - - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); - - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); - - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - } - - boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, - (acc, v) -> Boolean.valueOf(acc && v), - (a, b) -> Boolean.valueOf(a && b)); - } - - void pauseAllOwnedPartions() - { - chatMessagesConsumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(chatMessagesTopic, shard)) - .toList()); - } - - Mono sendMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(chatMessagesTopic, shard); - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - zdt.toEpochSecond(), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - chatMessagesProducer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - - public void putChatRoom(ChatRoom chatRoom) - { - - ProducerRecord record = new ProducerRecord<>(chatRoom.getShard(), ); - // TODO: Nachricht senden! - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - } @Override public Mono getChatRoom(int shard, UUID id) { - if (loadInProgress) + if (chatMessageChanel.isLoadInProgress()) { throw new ShardNotOwnedException(shard); } else { - return Mono.justOrEmpty(chatrooms[shard].get(id)); + return chatMessageChanel.getChatRoom(shard, id); } } @Override public Flux getChatRooms(int shard) { - if (loadInProgress) + if (chatMessageChanel.isLoadInProgress()) { throw new ShardNotOwnedException(shard); } else { - return Flux.fromStream(chatrooms[shard].values().stream()); + return chatMessageChanel.getChatRooms(shard); } } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index f81d21f1..23bd921d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -2,7 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -16,18 +16,15 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final KafkaChatHomeService kafkaChatHomeService; - private final ShardingStrategy shardingStrategy; - private final Clock clock; - private final int bufferSize; + private final ChatRoomChannel chatRoomChannel; @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - int shard = shardingStrategy.selectShard(id); - KafkaChatRoomService service = new KafkaChatRoomService(kafkaChatHomeService, id); + KafkaChatRoomService service = new KafkaChatRoomService(chatRoomChannel, id); ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - kafkaChatHomeService.putChatRoom(chatRoom); + chatRoomChannel.putChatRoom(chatRoom); + return Mono.just(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 16ed3a70..07e3fe45 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final KafkaChatHomeService kafkaChatHomeService; + private final ChatMessageChannel chatMessageChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,21 +27,19 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return kafkaChatHomeService + return chatMessageChannel .sendMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } public void persistMessage(Message message) { - messages.put(message.getKey(), message) + messages.put(message.getKey(), message); } @Override synchronized public Mono getMessage(Message.MessageKey key) { - // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen - // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird? return Mono.fromSupplier(() -> messages.get(key)); } -- 2.20.1