package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
+import java.util.UUID;
+
public interface StorageStrategy
{
void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
Flux<ChatRoomInfo> readChatRoomInfo();
- void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux);
- Flux<ChatRoomData> readChatRoomData();
+ void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> readChatRoomData(UUID chatRoomId);
}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
ChatHome noneShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
+ ChatRoomServiceFactory chatRoomServiceFactory,
Clock clock)
{
return new SimpleChatHome(
- storageStrategy.readChatRoomData(),
+ storageStrategy,
+ chatRoomServiceFactory,
clock,
properties.getChatroomBufferSize());
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public SimpleChatHome(
- Flux<ChatRoomInfo> chatRoomInfoFlux,
- ChatRoomService chatRoomService,
+ StorageStrategy storageStrategy,
+ ChatRoomServiceFactory chatRoomServiceFactory,
Clock clock,
int bufferSize)
{
this(
null,
- chatRoomInfoFlux,
- chatRoomService,
+ storageStrategy,
+ chatRoomServiceFactory,
clock,
bufferSize);
}
public SimpleChatHome(
Integer shard,
- Flux<ChatRoomInfo> chatRoomInfoFlux,
- ChatRoomService chatRoomService,
+ StorageStrategy storageStrategy,
+ ChatRoomServiceFactory chatRoomServiceFactory,
Clock clock,
int bufferSize)
{
this.shard = shard;
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
- chatRoomInfoFlux
+ storageStrategy
+ .readChatRoomInfo()
.filter(info ->
{
if (shard == null || info.getShard() == shard)
.toStream()
.forEach(info ->
{
- chatRoomInfo.put(info.getId(), info);
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
chatRoomData.put(
info.getId(),
- new ChatRoomData(chatRoomService, clock, bufferSize));
+ new ChatRoomData(
+ chatRoomId,
+ chatRoomServiceFactory.create(messageFlux),
+ clock,
+ bufferSize));
});
this.clock = clock;
this.bufferSize = bufferSize;
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
- ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(id, service, clock, bufferSize);
this.chatRoomData.put(id, chatRoomData);
return Mono.just(chatRoomInfo);
}
throw new RuntimeException(e);
}
})
- .subscribe(chatroom ->
+ .subscribe(chatRoomInfo ->
{
try
{
- ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
- generator.writeObject(infoTo);
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
}
catch (IOException e)
{
}
@Override
- public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux)
{
- Path path = chatroomsPath();
- log.info("Writing chatrooms to {}", path);
- try
- {
- Files.createDirectories(storagePath);
-
- JsonGenerator generator =
- mapper
- .getFactory()
- .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
-
- chatRoomDataFlux
- .log()
- .doFirst(() ->
- {
- try
- {
- generator.useDefaultPrettyPrinter();
- generator.writeStartArray();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .doOnTerminate(() ->
- {
- try
- {
- generator.writeEndArray();
- generator.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- })
- .subscribe(chatRoomData -> writeMessages(
- chatRoomData.getId(),
- chatRoomData.getMessages()));
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Flux<ChatRoomData> readChatRoomData()
- {
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
- return Flux
- .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
- .log()
- .map(infoTo ->
- {
- UUID chatRoomId = infoTo.getId();
- return new ChatRoomData(
- chatRoomId,
- factory.create(readMessages(chatRoomId)),
- clock,
- bufferSize);
- });
- }
-
- public void writeMessages(UUID id, Flux<Message> messageFlux)
- {
- Path path = chatroomPath(id);
- log.info("Writing messages for {} to {}", id, path);
+ Path path = chatroomPath(chatRoomId);
+ log.info("Writing messages for {} to {}", chatRoomId, path);
try
{
Files.createDirectories(storagePath);
}
}
- public Flux<Message> readMessages(UUID id)
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(id), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
.log()
.map(MessageTo::toMessage);
}
import org.springframework.data.mongodb.repository.MongoRepository;
+import java.util.List;
+
public interface MessageRepository extends MongoRepository<MessageTo, String>
{
+ List<MessageTo> findByChatRoomId(String chatRoomId);
}
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
+import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Bean
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
+ MessageRepository messageRepository,
ChatBackendProperties properties,
Clock clock,
- ShardingStrategy shardingStrategy)
+ ShardingStrategy shardingStrategy,
+ ChatRoomServiceFactory chatRoomServiceFactory)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
+ messageRepository,
clock,
properties.getChatroomBufferSize(),
shardingStrategy,
- messageFlux -> new InMemoryChatRoomService(messageFlux));
+ chatRoomServiceFactory);
}
}
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
}
@Override
- public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
{
- chatRoomDataFlux
- .flatMap(ChatRoomTo::from)
- .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
+ messageFlux
+ .map(message -> MessageTo.from(message))
+ .subscribe(messageTo -> messageRepository.save(messageTo)); // TODO: Namespace <chatRoomId>
}
@Override
- public Flux<ChatRoomData> readChatRoomData()
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
return Flux
- .fromIterable(chatRoomRepository.findAll())
- .map(chatRoomTo ->
- {
- UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoomData(
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize);
- });
+ .fromIterable(messageRepository.findByChatRoomId(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());
}
}
package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
+import java.util.UUID;
+
@ConditionalOnProperty(
prefix = "chat.backend.inmemory",
}
@Override
- public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux) {}
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
@Override
- public Flux<ChatRoomData> readChatRoomData()
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
return Flux.empty();
}