refactor: Moved classes in package `persistence` in sub-packages -- Move
authorKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 21:18:18 +0000 (22:18 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 9 Jan 2023 21:22:54 +0000 (22:22 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatHomeService.java
deleted file mode 100644 (file)
index b74f17f..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@Slf4j
-public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
-{
-  private final Map<UUID, ChatRoom> chatrooms;
-  private final Clock clock;
-  private final int bufferSize;
-
-
-  public InMemoryChatHomeService(
-      Flux<ChatRoom> chatroomFlux,
-      Clock clock,
-      int bufferSize)
-  {
-    log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
-    this.chatrooms = new HashMap<>();
-    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
-    this.clock = clock;
-    this.bufferSize = bufferSize;
-  }
-
-  @Override
-  public Mono<ChatRoom> createChatRoom(UUID id, String name)
-  {
-    InMemoryChatRoomService service =
-        new InMemoryChatRoomService(new LinkedHashMap<>());
-    ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
-    chatrooms.put(chatRoom.getId(), chatRoom);
-    return Mono.just(chatRoom);
-  }
-
-  @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
-  {
-    return Mono.justOrEmpty(chatrooms.get(id));
-  }
-
-  @Override
-  public Flux<ChatRoom> getChatRooms()
-  {
-    return Flux.fromStream(chatrooms.values().stream());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/InMemoryChatRoomService.java
deleted file mode 100644 (file)
index 49d400b..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-
-
-@Slf4j
-public class InMemoryChatRoomService implements ChatRoomService
-{
-  private final LinkedHashMap<Message.MessageKey, Message> messages;
-
-
-  public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
-  {
-    this.messages = messages;
-  }
-
-  public InMemoryChatRoomService(Flux<Message> messageFlux)
-  {
-    log.debug("Creating InMemoryChatroomService");
-    messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
-  }
-
-  @Override
-  public Message persistMessage(
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
-    messages.put(message.getKey(), message);
-    return message;
-  }
-
-  @Override
-  public Mono<Message> getMessage(Message.MessageKey key)
-  {
-    return Mono.fromSupplier(() -> messages.get(key));
-  }
-
-  @Override
-  public Flux<Message> getMessages(long first, long last)
-  {
-    return Flux.fromStream(messages
-        .values()
-        .stream()
-        .filter(message ->
-        {
-          long serial = message.getSerialNumber();
-          return serial >= first && serial <= last;
-        }));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/JsonFilePublisher.java
deleted file mode 100644 (file)
index e502539..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.util.Iterator;
-import java.util.List;
-import java.util.function.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class JsonFilePublisher<T> implements Publisher<T>
-{
-  private final Path path;
-  private final ObjectMapper mapper;
-  private final JavaType type;
-
-
-  @Override
-  public void subscribe(Subscriber<? super T> subscriber)
-  {
-    log.info("Reading chatrooms from {}", path);
-
-    try
-    {
-      JsonParser parser =
-          mapper.getFactory().createParser(Files.newBufferedReader(path));
-
-      if (parser.nextToken() != JsonToken.START_ARRAY)
-      {
-        throw new IllegalStateException("Expected content to be an array");
-      }
-
-      subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
-    }
-    catch (NoSuchFileException e)
-    {
-      log.info("{} does not exist - starting with empty ChatHome", path);
-      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
-    }
-    catch (IOException | IllegalStateException e)
-    {
-      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
-    }
-  }
-
-  @RequiredArgsConstructor
-  private class JsonFileSubscription implements Subscription
-  {
-    private final Subscriber<? super T> subscriber;
-    private final JsonParser parser;
-
-    @Override
-    public void request(long requested)
-    {
-      try
-      {
-        while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
-        {
-          subscriber.onNext(mapper.readValue(parser, type));
-          requested--;
-        }
-
-        if (requested > 0)
-          subscriber.onComplete();
-      }
-      catch (IOException e)
-      {
-        subscriber.onError(e);
-      }
-    }
-
-    @Override
-    public void cancel() {}
-  }
-
-  private class ReplaySubscription implements Subscription
-  {
-    private final Subscriber<? super T> subscriber;
-    private final Iterator<Consumer<Subscriber<? super T>>> iterator;
-
-    ReplaySubscription(
-        Subscriber<? super T> subscriber,
-        Iterable<Consumer<Subscriber<? super T>>> actions)
-    {
-      this.subscriber = subscriber;
-      this.iterator = actions.iterator();
-    }
-
-    @Override
-    public void request(long requested)
-    {
-      while (requested > 0 && iterator.hasNext())
-      {
-        iterator.next().accept(subscriber);
-        requested--;
-      }
-
-      if (requested > 0)
-        subscriber.onComplete();
-    }
-
-    @Override
-    public void cancel() {}
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategy.java
deleted file mode 100644 (file)
index 7b490bf..0000000
+++ /dev/null
@@ -1,194 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
-import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Clock;
-
-import static java.nio.file.StandardOpenOption.CREATE;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class LocalJsonFilesStorageStrategy implements StorageStrategy
-{
-  public static final String CHATROOMS_FILENAME = "chatrooms.json";
-
-
-  private final Path storagePath;
-  private final Clock clock;
-  private final int bufferSize;
-  private final ObjectMapper mapper;
-
-
-  @Override
-  public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
-  {
-    Path path = chatroomsPath();
-    log.info("Writing chatrooms to {}", path);
-    try
-    {
-      Files.createDirectories(storagePath);
-
-      JsonGenerator generator =
-          mapper
-              .getFactory()
-              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
-      chatroomFlux
-          .log()
-          .doFirst(() ->
-          {
-            try
-            {
-              generator.useDefaultPrettyPrinter();
-              generator.writeStartArray();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .doOnTerminate(() ->
-          {
-            try
-            {
-              generator.writeEndArray();
-              generator.close();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .subscribe(chatroom ->
-          {
-            try
-            {
-              ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
-              generator.writeObject(chatroomTo);
-              writeMessages(chatroomTo, chatroom.getMessages());
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Flux<ChatRoom> readChatrooms()
-  {
-    JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
-    return Flux
-        .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
-        .log()
-        .map(chatRoomTo ->
-        {
-          InMemoryChatRoomService chatroomService =
-              new InMemoryChatRoomService(readMessages(chatRoomTo));
-          return new ChatRoom(
-              chatRoomTo.getId(),
-              chatRoomTo.getName(),
-              clock,
-              chatroomService,
-              bufferSize);
-        });
-  }
-
-  @Override
-  public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
-  {
-    Path path = chatroomPath(chatroomTo);
-    log.info("Writing messages for {} to {}", chatroomTo, path);
-    try
-    {
-      Files.createDirectories(storagePath);
-
-      JsonGenerator generator =
-          mapper
-              .getFactory()
-              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
-      messageFlux
-          .log()
-          .doFirst(() ->
-          {
-            try
-            {
-              generator.useDefaultPrettyPrinter();
-              generator.writeStartArray();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .doOnTerminate(() ->
-          {
-            try
-            {
-              generator.writeEndArray();
-              generator.close();
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          })
-          .subscribe(message ->
-          {
-            try
-            {
-              MessageTo messageTo = MessageTo.from(message);
-              generator.writeObject(messageTo);
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-    }
-    catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Flux<Message> readMessages(ChatRoomTo chatroomTo)
-  {
-    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
-    return Flux
-        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
-        .log()
-        .map(MessageTo::toMessage);
-  }
-
-  Path chatroomsPath()
-  {
-    return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
-  }
-
-  Path chatroomPath(ChatRoomTo chatroomTo)
-  {
-    return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/FileStorageStrategy.java
new file mode 100644 (file)
index 0000000..7b490bf
--- /dev/null
@@ -0,0 +1,194 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class LocalJsonFilesStorageStrategy implements StorageStrategy
+{
+  public static final String CHATROOMS_FILENAME = "chatrooms.json";
+
+
+  private final Path storagePath;
+  private final Clock clock;
+  private final int bufferSize;
+  private final ObjectMapper mapper;
+
+
+  @Override
+  public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
+  {
+    Path path = chatroomsPath();
+    log.info("Writing chatrooms to {}", path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      chatroomFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(chatroom ->
+          {
+            try
+            {
+              ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
+              generator.writeObject(chatroomTo);
+              writeMessages(chatroomTo, chatroom.getMessages());
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<ChatRoom> readChatrooms()
+  {
+    JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+    return Flux
+        .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+        .log()
+        .map(chatRoomTo ->
+        {
+          InMemoryChatRoomService chatroomService =
+              new InMemoryChatRoomService(readMessages(chatRoomTo));
+          return new ChatRoom(
+              chatRoomTo.getId(),
+              chatRoomTo.getName(),
+              clock,
+              chatroomService,
+              bufferSize);
+        });
+  }
+
+  @Override
+  public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+  {
+    Path path = chatroomPath(chatroomTo);
+    log.info("Writing messages for {} to {}", chatroomTo, path);
+    try
+    {
+      Files.createDirectories(storagePath);
+
+      JsonGenerator generator =
+          mapper
+              .getFactory()
+              .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
+
+      messageFlux
+          .log()
+          .doFirst(() ->
+          {
+            try
+            {
+              generator.useDefaultPrettyPrinter();
+              generator.writeStartArray();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .doOnTerminate(() ->
+          {
+            try
+            {
+              generator.writeEndArray();
+              generator.close();
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          })
+          .subscribe(message ->
+          {
+            try
+            {
+              MessageTo messageTo = MessageTo.from(message);
+              generator.writeObject(messageTo);
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+    }
+    catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+  {
+    JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
+    return Flux
+        .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+        .log()
+        .map(MessageTo::toMessage);
+  }
+
+  Path chatroomsPath()
+  {
+    return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
+  }
+
+  Path chatroomPath(ChatRoomTo chatroomTo)
+  {
+    return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/filestorage/JsonFilePublisher.java
new file mode 100644 (file)
index 0000000..e502539
--- /dev/null
@@ -0,0 +1,118 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class JsonFilePublisher<T> implements Publisher<T>
+{
+  private final Path path;
+  private final ObjectMapper mapper;
+  private final JavaType type;
+
+
+  @Override
+  public void subscribe(Subscriber<? super T> subscriber)
+  {
+    log.info("Reading chatrooms from {}", path);
+
+    try
+    {
+      JsonParser parser =
+          mapper.getFactory().createParser(Files.newBufferedReader(path));
+
+      if (parser.nextToken() != JsonToken.START_ARRAY)
+      {
+        throw new IllegalStateException("Expected content to be an array");
+      }
+
+      subscriber.onSubscribe(new JsonFileSubscription(subscriber, parser));
+    }
+    catch (NoSuchFileException e)
+    {
+      log.info("{} does not exist - starting with empty ChatHome", path);
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of()));
+    }
+    catch (IOException | IllegalStateException e)
+    {
+      subscriber.onSubscribe(new ReplaySubscription(subscriber, List.of((s -> s.onError(e)))));
+    }
+  }
+
+  @RequiredArgsConstructor
+  private class JsonFileSubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final JsonParser parser;
+
+    @Override
+    public void request(long requested)
+    {
+      try
+      {
+        while (requested > 0 && parser.nextToken() != JsonToken.END_ARRAY)
+        {
+          subscriber.onNext(mapper.readValue(parser, type));
+          requested--;
+        }
+
+        if (requested > 0)
+          subscriber.onComplete();
+      }
+      catch (IOException e)
+      {
+        subscriber.onError(e);
+      }
+    }
+
+    @Override
+    public void cancel() {}
+  }
+
+  private class ReplaySubscription implements Subscription
+  {
+    private final Subscriber<? super T> subscriber;
+    private final Iterator<Consumer<Subscriber<? super T>>> iterator;
+
+    ReplaySubscription(
+        Subscriber<? super T> subscriber,
+        Iterable<Consumer<Subscriber<? super T>>> actions)
+    {
+      this.subscriber = subscriber;
+      this.iterator = actions.iterator();
+    }
+
+    @Override
+    public void request(long requested)
+    {
+      while (requested > 0 && iterator.hasNext())
+      {
+        iterator.next().accept(subscriber);
+        requested--;
+      }
+
+      if (requested > 0)
+        subscriber.onComplete();
+    }
+
+    @Override
+    public void cancel() {}
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
new file mode 100644 (file)
index 0000000..b74f17f
--- /dev/null
@@ -0,0 +1,57 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@Slf4j
+public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
+{
+  private final Map<UUID, ChatRoom> chatrooms;
+  private final Clock clock;
+  private final int bufferSize;
+
+
+  public InMemoryChatHomeService(
+      Flux<ChatRoom> chatroomFlux,
+      Clock clock,
+      int bufferSize)
+  {
+    log.debug("Creating ChatHomeService with buffer-size {} (for created ChatRoom's)", bufferSize);
+    this.chatrooms = new HashMap<>();
+    chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+    this.clock = clock;
+    this.bufferSize = bufferSize;
+  }
+
+  @Override
+  public Mono<ChatRoom> createChatRoom(UUID id, String name)
+  {
+    InMemoryChatRoomService service =
+        new InMemoryChatRoomService(new LinkedHashMap<>());
+    ChatRoom chatRoom = new ChatRoom(id, name, clock, service, bufferSize);
+    chatrooms.put(chatRoom.getId(), chatRoom);
+    return Mono.just(chatRoom);
+  }
+
+  @Override
+  public Mono<ChatRoom> getChatRoom(UUID id)
+  {
+    return Mono.justOrEmpty(chatrooms.get(id));
+  }
+
+  @Override
+  public Flux<ChatRoom> getChatRooms()
+  {
+    return Flux.fromStream(chatrooms.values().stream());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java
new file mode 100644 (file)
index 0000000..49d400b
--- /dev/null
@@ -0,0 +1,60 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class InMemoryChatRoomService implements ChatRoomService
+{
+  private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+  public InMemoryChatRoomService(LinkedHashMap<Message.MessageKey, Message> messages)
+  {
+    this.messages = messages;
+  }
+
+  public InMemoryChatRoomService(Flux<Message> messageFlux)
+  {
+    log.debug("Creating InMemoryChatroomService");
+    messages = new LinkedHashMap<>();
+    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+  }
+
+  @Override
+  public Message persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    Message message = new Message(key, (long)messages.size(), timestamp, text);
+    messages.put(message.getKey(), message);
+    return message;
+  }
+
+  @Override
+  public Mono<Message> getMessage(Message.MessageKey key)
+  {
+    return Mono.fromSupplier(() -> messages.get(key));
+  }
+
+  @Override
+  public Flux<Message> getMessages(long first, long last)
+  {
+    return Flux.fromStream(messages
+        .values()
+        .stream()
+        .filter(message ->
+        {
+          long serial = message.getSerialNumber();
+          return serial >= first && serial <= last;
+        }));
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFileStorageStrategyIT.java
new file mode 100644 (file)
index 0000000..b53dd10
--- /dev/null
@@ -0,0 +1,77 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.function.Supplier;
+
+
+@Slf4j
+public class LocalJsonFilesStorageStrategyIT extends AbstractStorageStrategyIT
+{
+  final static Path path = Paths.get("target","local-json-files");
+
+  final Clock clock;
+  final ObjectMapper mapper;
+  final LocalJsonFilesStorageStrategy storageStrategy;
+
+
+  public LocalJsonFilesStorageStrategyIT()
+  {
+    clock = Clock.systemDefaultZone();
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+    storageStrategy =new LocalJsonFilesStorageStrategy(path, clock, 8, mapper);
+
+  }
+
+
+  @Override
+  StorageStrategy getStorageStrategy()
+  {
+    return storageStrategy;
+  }
+
+  @Override
+  Supplier<ChatHomeService> chatHomeServiceSupplier()
+  {
+    return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
+  }
+
+  @BeforeEach
+  void reset() throws Exception
+  {
+    if (Files.exists(path))
+    {
+      Files
+          .walk(path)
+          .forEach(file ->
+          {
+            try
+            {
+              if (!file.equals(path))
+              {
+                log.debug("Deleting file {}", file);
+                Files.delete(file);
+              }
+            }
+            catch (IOException e)
+            {
+              throw new RuntimeException(e);
+            }
+          });
+      log.debug("Deleting data-directory {}", path);
+      Files.delete(path);
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/LocalJsonFilesStorageStrategyIT.java
deleted file mode 100644 (file)
index b53dd10..0000000
+++ /dev/null
@@ -1,77 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.function.Supplier;
-
-
-@Slf4j
-public class LocalJsonFilesStorageStrategyIT extends AbstractStorageStrategyIT
-{
-  final static Path path = Paths.get("target","local-json-files");
-
-  final Clock clock;
-  final ObjectMapper mapper;
-  final LocalJsonFilesStorageStrategy storageStrategy;
-
-
-  public LocalJsonFilesStorageStrategyIT()
-  {
-    clock = Clock.systemDefaultZone();
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-    storageStrategy =new LocalJsonFilesStorageStrategy(path, clock, 8, mapper);
-
-  }
-
-
-  @Override
-  StorageStrategy getStorageStrategy()
-  {
-    return storageStrategy;
-  }
-
-  @Override
-  Supplier<ChatHomeService> chatHomeServiceSupplier()
-  {
-    return () -> new InMemoryChatHomeService(getStorageStrategy().readChatrooms(), clock, 8);
-  }
-
-  @BeforeEach
-  void reset() throws Exception
-  {
-    if (Files.exists(path))
-    {
-      Files
-          .walk(path)
-          .forEach(file ->
-          {
-            try
-            {
-              if (!file.equals(path))
-              {
-                log.debug("Deleting file {}", file);
-                Files.delete(file);
-              }
-            }
-            catch (IOException e)
-            {
-              throw new RuntimeException(e);
-            }
-          });
-      log.debug("Deleting data-directory {}", path);
-      Files.delete(path);
-    }
-  }
-}