ChatroomFactory chatroomFactory,
StorageStrategy storageStrategy)
{
- return new ChatHome(
- storageStrategy.readChatrooms().collectMap(chatroom -> chatroom.getId()).block(),
- chatroomFactory);
+ return new ChatHome(chatroomFactory, storageStrategy.readChatrooms());
}
@Bean
package de.juplo.kafka.chat.backend.domain;
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
import java.util.*;
import java.util.stream.Stream;
-@RequiredArgsConstructor
+@Slf4j
public class ChatHome
{
private final Map<UUID, Chatroom> chatrooms;
private final ChatroomFactory factory;
+ public ChatHome(ChatroomFactory factory, Flux<Chatroom> chatroomFlux)
+ {
+ log.debug("Creating ChatHome with factory: {}", factory);
+ this.factory = factory;
+ this.chatrooms = new HashMap<>();
+ chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+ }
public Chatroom createChatroom(String name)
{
this.messages = messages;
}
+ public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+ {
+ log.debug("Creating InMemoryPersistenceStrategy");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> persistMessage(message));
+ }
@Override
public Mono<Message> persistMessage(
String text)
{
Message message = new Message(key, (long)messages.size(), timestamp, text);
+ return Mono.justOrEmpty(persistMessage(message));
+ }
+ private Message persistMessage(Message message)
+ {
+ Message.MessageKey key = message.getKey();
Message existing = messages.get(key);
if (existing != null)
{
log.info("Message with key {} already exists; {}", key, existing);
if (!message.equals(existing))
throw new MessageMutationException(message, existing);
- return Mono.empty();
+ return null;
}
messages.put(key, message);
- return Mono
- .fromSupplier(() -> message)
- .log();
+ return message;
}
@Override
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+ private final Path path;
+ private final ObjectMapper mapper;
+ private final JavaType type;
+
+
+ @Override
+ public void subscribe(Subscriber<? super T> subscriber)
+ {
+ log.info("Reading chatrooms from {}", path);
+
+ try
+ {
+ JsonParser parser =
+ mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+ if (parser.nextToken() != JsonToken.START_ARRAY)
+ {
+ throw new IllegalStateException("Expected content to be an array");
+ }
+
+ subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+ }
+ catch (NoSuchFileException e)
+ {
+ log.info("{} does not exist - starting with empty ChatHome", path);
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+ }
+ catch (IOException | IllegalStateException e)
+ {
+ subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+ }
+ }
+
+ @RequiredArgsConstructor
+ private class JsonFileSubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final JsonParser parser;
+
+ @Override
+ public void request(long requested)
+ {
+ try
+ {
+ while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+ {
+ subscriber.onNext(mapper.readValue(parser, type));
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+ catch (IOException e)
+ {
+ subscriber.onError(e);
+ }
+ }
+
+ @Override
+ public void cancel() {}
+ }
+
+ private class ReplaySubscription implements Subscription
+ {
+ private final Subscriber<? super T> subscriber;
+ private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+ ReplaySubscription(
+ Subscriber<? super T> subscriber,
+ Iterable<Consumer<Subscriber<? super T>>> actions)
+ {
+ this.subscriber = subscriber;
+ this.iterator = actions.iterator();
+ }
+
+ @Override
+ public void request(long requested)
+ {
+ while (requested > 0 && iterator.hasNext())
+ {
+ iterator.next().accept(subscriber);
+ requested--;
+ }
+
+ if (requested > 0)
+ subscriber.onComplete();
+ }
+
+ @Override
+ public void cancel() {}
+ }
+}
package de.juplo.kafka.chat.backend.persistence;
import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatroomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.Chatroom;
import de.juplo.kafka.chat.backend.domain.ChatroomFactory;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Sinks;
import java.io.IOException;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.util.LinkedHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
@Override
public Flux<Chatroom> readChatrooms()
{
- Path path = chatroomsPath();
- log.info("Reading chatrooms from {}", path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<ChatroomTo> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, ChatroomTo.class))
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many
- .asFlux()
- .map(chatroomTo ->
- {
- LinkedHashMap<Message.MessageKey, Message> messages =
- readMessages(chatroomTo)
- .collect(Collectors.toMap(
- Message::getKey,
- Function.identity(),
- (existing, message) ->
- {
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return existing;
- },
- LinkedHashMap::new))
- .block();
- InMemoryPersistenceStrategy strategy = new InMemoryPersistenceStrategy(messages);
- return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
- });
- }
- catch (NoSuchFileException e)
- {
- log.info("{} does not exist - starting with empty ChatHome", path);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
+ return Flux
+ .from(new JsonFilePublisher<ChatroomTo>(chatroomsPath(), mapper, type))
+ .log()
+ .map(chatroomTo ->
+ {
+ InMemoryPersistenceStrategy strategy =
+ new InMemoryPersistenceStrategy(readMessages(chatroomTo));
+ return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
+ });
}
@Override
@Override
public Flux<Message> readMessages(ChatroomTo chatroomTo)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Reading messages for {} from {}", chatroomTo, path);
- try
- {
- JsonParser parser =
- mapper
- .getFactory()
- .createParser(Files.newBufferedReader(path));
-
- if (parser.nextToken() != JsonToken.START_ARRAY)
- throw new IllegalStateException("Expected content to be an array");
-
- Sinks.Many<Message> many = Sinks.many().unicast().onBackpressureBuffer();
-
- while (parser.nextToken() != JsonToken.END_ARRAY)
- {
- many
- .tryEmitNext(mapper.readValue(parser, MessageTo.class).toMessage())
- .orThrow();
- }
-
- many.tryEmitComplete().orThrow();
-
- return many.asFlux();
- }
- catch (NoSuchFileException e)
- {
- log.info(
- "{} does not exist - starting with empty chat for {}",
- path,
- chatroomTo);
- return Flux.empty();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+ return Flux
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .log()
+ .map(MessageTo::toMessage);
}
Path chatroomsPath()