package de.juplo.kafka.chat.backend.domain;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
-@RequiredArgsConstructor
@Slf4j
public class Chatroom
{
@Getter
private final String name;
private final PersistenceStrategy persistence;
- private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
+ private final Sinks.Many<Message> sink;
+
+ public Chatroom(
+ UUID id,
+ String name,
+ PersistenceStrategy persistence,
+ int bufferSize)
+ {
+ this.id = id;
+ this.name = name;
+ this.persistence = persistence;
+ this.sink = Sinks.many().multicast().onBackpressureBuffer(bufferSize);
+ }
+
synchronized public Mono<Message> addMessage(
Long id,
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
-import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
import lombok.RequiredArgsConstructor;
+import java.util.LinkedHashMap;
import java.util.UUID;
@RequiredArgsConstructor
public class InMemoryChatroomFactory implements ChatroomFactory<InMemoryPersistenceStrategy>
{
+ private final int bufferSize;
+
+
@Override
public Chatroom createChatroom(UUID id, String name)
{
InMemoryPersistenceStrategy persistenceStrategy =
new InMemoryPersistenceStrategy(new LinkedHashMap<>());
- return new Chatroom(id, name, persistenceStrategy);
+ return new Chatroom(id, name, persistenceStrategy, bufferSize);
}
@Override
String name,
InMemoryPersistenceStrategy persistenceStrategy)
{
- return new Chatroom(id, name, persistenceStrategy);
+ return new Chatroom(id, name, persistenceStrategy, bufferSize);
}
}