From: Kai Moritz Date: Sat, 7 Jan 2023 20:05:30 +0000 (+0100) Subject: feat: The size buffer for listeners to a chatroom is configurable X-Git-Tag: TEST~51 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ad4f1e835c713b196940d3c35e138ef509bfe237;p=demos%2Fkafka%2Fchat feat: The size buffer for listeners to a chatroom is configurable --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index aea5f8ec..adabb922 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -41,9 +41,9 @@ public class ChatBackendConfiguration } @Bean - ChatroomFactory chatroomFactory() + ChatroomFactory chatroomFactory(ChatBackendProperties properties) { - return new InMemoryChatroomFactory(); + return new InMemoryChatroomFactory(properties.getChatroomBufferSize()); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 122ac619..26149de0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -14,4 +14,5 @@ public class ChatBackendProperties { private String datadir = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); private String allowedOrigins = "http://localhost:4200"; + private int chatroomBufferSize = 8; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java index 8f4a7975..966a28ee 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/Chatroom.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,7 +10,6 @@ import java.time.LocalDateTime; import java.util.*; -@RequiredArgsConstructor @Slf4j public class Chatroom { @@ -20,7 +18,20 @@ public class Chatroom @Getter private final String name; private final PersistenceStrategy persistence; - private final Sinks.Many sink = Sinks.many().multicast().onBackpressureBuffer(); + private final Sinks.Many sink; + + public Chatroom( + UUID id, + String name, + PersistenceStrategy persistence, + int bufferSize) + { + this.id = id; + this.name = name; + this.persistence = persistence; + this.sink = Sinks.many().multicast().onBackpressureBuffer(bufferSize); + } + synchronized public Mono addMessage( Long id, diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java index 415a71f5..a405be9e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatroomFactory.java @@ -2,21 +2,24 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.Chatroom; import de.juplo.kafka.chat.backend.domain.ChatroomFactory; -import de.juplo.kafka.chat.backend.domain.PersistenceStrategy; import lombok.RequiredArgsConstructor; +import java.util.LinkedHashMap; import java.util.UUID; @RequiredArgsConstructor public class InMemoryChatroomFactory implements ChatroomFactory { + private final int bufferSize; + + @Override public Chatroom createChatroom(UUID id, String name) { InMemoryPersistenceStrategy persistenceStrategy = new InMemoryPersistenceStrategy(new LinkedHashMap<>()); - return new Chatroom(id, name, persistenceStrategy); + return new Chatroom(id, name, persistenceStrategy, bufferSize); } @Override @@ -25,6 +28,6 @@ public class InMemoryChatroomFactory implements ChatroomFactory