@Autowired
ChatBackendProperties properties;
@Autowired
- ChatHome[] chatHomes;
+ ChatHome chatHome;
@Autowired
StorageStrategy storageStrategy;
@PreDestroy
public void onExit()
{
- for (int shard = 0; shard < chatHomes.length; shard++)
- storageStrategy.write(chatHomes[shard].getChatRooms());
+ storageStrategy.write(chatHome);
}
public static void main(String[] args)
package de.juplo.kafka.chat.backend.api;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
import org.springframework.http.codec.ServerSentEvent;
public Flux<ChatRoomInfoTo> list()
{
return chatHome
- .getChatRooms()
- .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+ .getChatRoomInfo()
+ .map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo));
}
- @GetMapping("{chatroomId}/list")
- public Flux<MessageTo> list(@PathVariable UUID chatroomId)
+ @GetMapping("{chatRoomId}/list")
+ public Flux<MessageTo> list(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .flatMapMany(chatroom -> chatroom
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(chatRoomData -> chatRoomData
.getMessages()
.map(MessageTo::from));
}
- @GetMapping("{chatroomId}")
- public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatroomId)
+ @GetMapping("{chatRoomId}")
+ public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+ .getChatRoomInfo(chatRoomId)
+ .map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo));
}
- @PutMapping("{chatroomId}/{username}/{messageId}")
+ @PutMapping("{chatRoomId}/{username}/{messageId}")
public Mono<MessageTo> put(
- @PathVariable UUID chatroomId,
+ @PathVariable UUID chatRoomId,
@PathVariable String username,
@PathVariable Long messageId,
@RequestBody String text)
{
return
chatHome
- .getChatRoom(chatroomId)
- .flatMap(chatroom -> put(chatroom, username, messageId, text));
+ .getChatRoomData(chatRoomId)
+ .flatMap(chatRoomData -> put(chatRoomData, username, messageId, text));
}
private Mono<MessageTo> put(
- ChatRoom chatroom,
+ ChatRoomData chatRoomData,
String username,
Long messageId,
String text)
{
return
- chatroom
+ chatRoomData
.addMessage(
messageId,
username,
.map(message -> MessageTo.from(message));
}
- @GetMapping("{chatroomId}/{username}/{messageId}")
+ @GetMapping("{chatRoomId}/{username}/{messageId}")
public Mono<MessageTo> get(
- @PathVariable UUID chatroomId,
+ @PathVariable UUID chatRoomId,
@PathVariable String username,
@PathVariable Long messageId)
{
return
chatHome
- .getChatRoom(chatroomId)
- .flatMap(chatroom -> get(chatroom, username, messageId));
+ .getChatRoomData(chatRoomId)
+ .flatMap(chatRoomData -> get(chatRoomData, username, messageId));
}
private Mono<MessageTo> get(
- ChatRoom chatroom,
+ ChatRoomData chatRoomData,
String username,
Long messageId)
{
return
- chatroom
+ chatRoomData
.getMessage(username, messageId)
.map(message -> MessageTo.from(message));
}
- @GetMapping(path = "{chatroomId}/listen")
- public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
+ @GetMapping(path = "{chatRoomId}/listen")
+ public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .flatMapMany(chatroom -> listen(chatroom));
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(chatRoomData -> listen(chatRoomData));
}
- private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
+ private Flux<ServerSentEvent<MessageTo>> listen(ChatRoomData chatRoomData)
{
- return chatroom
+ return chatRoomData
.listen()
.log()
.map(message -> MessageTo.from(message))
@PostMapping("/store")
public void store()
{
- storageStrategy.write(chatHome.getChatRooms());
+ storageStrategy.write(chatHome);
}
}
{
Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
- Mono<ChatRoom> getChatRoom(UUID id);
+ Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
- Flux<ChatRoom> getChatRooms();
+ Flux<ChatRoomInfo> getChatRoomInfo();
+
+ Mono<ChatRoomData> getChatRoomData(UUID id);
}
@Slf4j
-public class ChatRoom
+public class ChatRoomData
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
- private final Clock clock;
private final ChatRoomService service;
+ private final Clock clock;
private final int bufferSize;
private Sinks.Many<Message> sink;
- public ChatRoom(
+ public ChatRoomData(
Clock clock,
ChatRoomService service,
int bufferSize)
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
+import java.util.UUID;
+
public interface StorageStrategy
{
- void write(Flux<ChatRoom> chatroomFlux);
- Flux<ChatRoom> read();
+ default void write(ChatHome chatHome)
+ {
+ writeChatRoomInfo(
+ chatHome
+ .getChatRoomInfo()
+ .doOnNext(chatRoomInfo -> writeChatRoomData(
+ chatRoomInfo.getId(),
+ chatHome
+ .getChatRoomData(chatRoomInfo.getId())
+ .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+ }
+
+ void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> readChatRoomInfo();
+ void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+ Flux<Message> readChatRoomData(UUID chatRoomId);
}
Clock clock)
{
return new SimpleChatHome(
- storageStrategy.read(),
+ storageStrategy,
clock,
properties.getChatroomBufferSize());
}
.of(properties.getInmemory().getOwnedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(
shard,
- storageStrategy.read(),
+ storageStrategy,
clock,
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
@Slf4j
public class ShardedChatHome implements ChatHome
{
- private final ChatHome[] chatHomes;
+ private final SimpleChatHome[] chatHomes;
private final Set<Integer> ownedShards;
private final ShardingStrategy shardingStrategy;
public ShardedChatHome(
- ChatHome[] chatHomes,
+ SimpleChatHome[] chatHomes,
ShardingStrategy shardingStrategy)
{
this.chatHomes = chatHomes;
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatHomes[shard] == null
? Mono.error(new ShardNotOwnedException(shard))
: chatHomes[shard]
- .getChatRoom(id)
+ .getChatRoomInfo(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
? new UnknownChatroomException(
id,
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
return Flux
.fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
+ .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
}
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomData(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomData());
+ }
+
+
private int selectShard(UUID chatroomId)
{
package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SimpleChatHome implements ChatHome
{
private final Integer shard;
- private final Map<UUID, ChatRoom> chatRooms;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
private final int bufferSize;
public SimpleChatHome(
- Flux<ChatRoom> chatroomFlux,
+ StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
- this(null, chatroomFlux, clock, bufferSize);
+ this(
+ null,
+ storageStrategy,
+ clock,
+ bufferSize);
}
public SimpleChatHome(
Integer shard,
- Flux<ChatRoom> chatroomFlux,
+ StorageStrategy storageStrategy,
Clock clock,
int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
;
this.shard = shard;
- this.chatRooms = new HashMap<>();
- chatroomFlux
- .filter(chatRoom ->
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ storageStrategy
+ .readChatRoomInfo()
+ .filter(info ->
{
- if (shard == null || chatRoom.getShard() == shard)
+ if (shard == null || info.getShard() == shard)
{
return true;
}
log.info(
"SimpleChatHome for shard {} ignores not owned chat-room {}",
shard,
- chatRoom);
+ info);
return false;
}
})
.toStream()
- .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+ .forEach(info ->
+ {
+ UUID chatRoomId = info.getId();
+ chatRoomInfo.put(chatRoomId, info);
+ Flux<Message> messageFlux =
+ storageStrategy.readChatRoomData(chatRoomId);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(
+ clock,
+ new InMemoryChatRoomService(messageFlux),
+ bufferSize));
+ });
this.clock = clock;
this.bufferSize = bufferSize;
}
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatRooms.put(id, chatRoom);
- return Mono.just(chatRoom);
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ this.chatRoomInfo.put(id, chatRoomInfo);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
return Mono
- .justOrEmpty(chatRooms.get(id))
+ .justOrEmpty(chatRoomInfo.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
+ }
+
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomData.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
{
- return Flux.fromIterable(chatRooms.values());
+ return Flux.fromIterable(chatRoomData.values());
}
}
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+ private final Map<UUID, ChatRoomData>[] chatRoomData;
private boolean running;
@Getter
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatrooms = new Map[numShards];
+ this.chatRoomInfo = new Map[numShards];
+ this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
- .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
+ .forEach(shard ->
+ {
+ this.chatRoomInfo[shard] = new HashMap<>();
+ this.chatRoomData[shard] = new HashMap<>();
+ });
}
private void createChatRoom(
UUID chatRoomId,
CommandCreateChatRoomTo createChatRoomRequestTo,
- int partition)
+ Integer partition)
{
- log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
- ChatRoom chatRoom = new ChatRoom(
+ log.info(
+ "Loading ChatRoom {} for shard {} with buffer-size {}",
chatRoomId,
- createChatRoomRequestTo.getName(),
partition,
+ bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ ChatRoomData chatRoomData = new ChatRoomData(
clock,
service,
bufferSize);
- putChatRoom(chatRoom);
+ putChatRoom(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition,
+ chatRoomData);
}
private void createChatRoom(ChatRoomInfo chatRoomInfo)
{
UUID id = chatRoomInfo.getId();
- String name = chatRoomInfo.getName();
- int shard = chatRoomInfo.getShard();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- putChatRoom(chatRoom);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ putChatRoom(
+ chatRoomInfo.getId(),
+ chatRoomInfo.getName(),
+ chatRoomInfo.getShard(),
+ chatRoomData);
}
private void loadChatMessage(
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+ (KafkaChatRoomService) chatRoomData.getChatRoomService();
kafkaChatRoomService.persistMessage(message);
}
}
- private void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(
+ UUID chatRoomId,
+ String name,
+ Integer partition,
+ ChatRoomData chatRoomData)
{
- Integer partition = chatRoom.getShard();
- UUID chatRoomId = chatRoom.getId();
- if (chatrooms[partition].containsKey(chatRoomId))
+ if (this.chatRoomInfo[partition].containsKey(chatRoomId))
{
- log.warn("Ignoring existing chat-room: " + chatRoom);
+ log.warn(
+ "Ignoring existing chat-room for {}: {}",
+ partition,
+ chatRoomId);
}
else
{
log.info(
"Adding new chat-room to partition {}: {}",
partition,
- chatRoom);
+ chatRoomData);
- chatrooms[partition].put(chatRoomId, chatRoom);
+ this.chatRoomInfo[partition].put(
+ chatRoomId,
+ new ChatRoomInfo(chatRoomId, name, partition));
+ this.chatRoomData[partition].put(chatRoomId, chatRoomData);
}
}
.toArray();
}
- Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+ {
+ if (loadInProgress)
+ {
+ return Mono.error(new LoadInProgressException());
+ }
+
+ if (!isShardOwned[shard])
+ {
+ return Mono.error(new ShardNotOwnedException(shard));
+ }
+
+ return Mono.justOrEmpty(chatRoomData[shard].get(id));
+ }
+
+ Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+ }
+
+ Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
{
if (loadInProgress)
{
return Mono.error(new ShardNotOwnedException(shard));
}
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
}
- Flux<ChatRoom> getChatRooms()
+ Flux<ChatRoomData> getChatRoomData()
{
return Flux
.fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
.filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+ .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatRoomChannel
- .getChatRoom(shard, id)
+ .getChatRoomInfo(shard, id)
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
id,
shard,
chatRoomChannel.getOwnedShards())));
}
- int selectShard(UUID chatRoomId)
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ return chatRoomChannel.getChatRoomInfo();
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
{
- return chatRoomChannel.getChatRooms();
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoomData(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return chatRoomChannel.getChatRoomData();
+ }
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.files;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Flux;
-
-
-public interface ChatRoomServiceFactory
-{
- ChatRoomService create(Flux<Message> messageFlux);
-}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import java.nio.file.Paths;
-import java.time.Clock;
@ConditionalOnProperty(
@Bean
public StorageStrategy storageStrategy(
ChatBackendProperties properties,
- Clock clock,
ShardingStrategy shardingStrategy,
ObjectMapper mapper)
{
return new FilesStorageStrategy(
Paths.get(properties.getInmemory().getStorageDirectory()),
- clock,
- properties.getChatroomBufferSize(),
shardingStrategy,
- messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Clock;
import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
private final Path storagePath;
- private final Clock clock;
- private final int bufferSize;
private final ShardingStrategy shardingStrategy;
- private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatroomFlux
+ chatRoomInfoFlux
.log()
.doFirst(() ->
{
throw new RuntimeException(e);
}
})
- .subscribe(chatroom ->
+ .subscribe(chatRoomInfo ->
{
try
{
- ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
- generator.writeObject(infoTo);
- writeMessages(infoTo, chatroom.getMessages());
+ ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo);
+ generator.writeObject(chatRoomInfoTo);
}
catch (IOException e)
{
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
.from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(infoTo ->
+ .map(chatRoomInfoTo ->
{
- UUID chatRoomId = infoTo.getId();
+ UUID chatRoomId = chatRoomInfoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
- infoTo.getId(),
- infoTo.getName(),
- shard,
- clock,
- factory.create(readMessages(infoTo)),
- bufferSize);
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomInfoTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
+ chatRoomId,
+ chatRoomInfoTo.getName(),
+ shard);
});
}
- public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
+ @Override
+ public void writeChatRoomData(
+ UUID chatRoomId,
+ Flux<Message> messageFlux)
{
- Path path = chatroomPath(infoTo);
- log.info("Writing messages for {} to {}", infoTo, path);
+ Path path = chatroomPath(chatRoomId);
+ log.info("Writing messages for {} to {}", chatRoomId, path);
try
{
Files.createDirectories(storagePath);
}
}
- public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatRoomId), mapper, type))
.log()
.map(MessageTo::toMessage);
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatRoomInfoTo infoTo)
+ Path chatroomPath(UUID id)
{
- return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(id.toString() + ".json"));
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
-import java.util.List;
-
@AllArgsConstructor
@NoArgsConstructor
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "name" })
+@ToString(of = { "id", "shard", "name" })
@Document
public class ChatRoomTo
{
@Id
private String id;
+ private Integer shard;
private String name;
- private List<MessageTo> messages;
- public static ChatRoomTo from(ChatRoom chatroom)
+ public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
{
return new ChatRoomTo(
- chatroom.getId().toString(),
- chatroom.getName(),
- chatroom
- .getMessages()
- .map(MessageTo::from)
- .collectList()
- .block());
+ chatRoomInfo.getId().toString(),
+ chatRoomInfo.getShard(),
+ chatRoomInfo.getName());
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.List;
+
+
+public interface MessageRepository extends MongoRepository<MessageTo, String>
+{
+ List<MessageTo> findByChatRoomIdOrderBySerialAsc(String chatRoomId);
+}
import de.juplo.kafka.chat.backend.domain.Message;
import lombok.*;
+import org.springframework.data.mongodb.core.index.Indexed;
+import org.springframework.data.mongodb.core.mapping.Document;
+import org.springframework.data.mongodb.core.mapping.Field;
import java.time.LocalDateTime;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.UUID;
@AllArgsConstructor
@NoArgsConstructor
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
-@EqualsAndHashCode(of = { "user", "id" })
-@ToString(of = { "user", "id" })
+@EqualsAndHashCode(of = { "chatRoomId", "user", "id" })
+@ToString(of = { "chatRoomId", "user", "id" })
+@Document
class MessageTo
{
- final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$");
- private String id;
+ @Indexed
+ private String chatRoomId;
+ @Indexed
+ private String user;
+ @Field("id")
+ @Indexed
+ private Long id;
+ @Indexed
private Long serial;
private String time;
private String text;
Message toMessage()
{
- Matcher matcher = SPLIT_ID.matcher(id);
- if (!matcher.matches())
- throw new RuntimeException("MessageTo with invalid ID: " + id);
- Long messageId = Long.parseLong(matcher.group(2));
- String user = matcher.group(1);
return new Message(
- Message.MessageKey.of(user, messageId),
+ Message.MessageKey.of(user, id),
serial,
LocalDateTime.parse(time),
text);
}
- static MessageTo from(Message message)
+ static MessageTo from(UUID chatRoomId, Message message)
{
return
new MessageTo(
- message.getUsername() + "--" + message.getId(),
+ chatRoomId.toString(),
+ message.getUsername(),
+ message.getId(),
message.getSerialNumber(),
message.getTimestamp().toString(),
message.getMessageText());
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import java.time.Clock;
-
@ConditionalOnProperty(
prefix = "chat.backend.inmemory",
@Bean
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
- ChatBackendProperties properties,
- Clock clock,
+ MessageRepository messageRepository,
ShardingStrategy shardingStrategy)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
- clock,
- properties.getChatroomBufferSize(),
- shardingStrategy,
- messageFlux -> new InMemoryChatRoomService(messageFlux));
+ messageRepository,
+ shardingStrategy);
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
-import java.time.Clock;
import java.util.UUID;
@Slf4j
public class MongoDbStorageStrategy implements StorageStrategy
{
- private final ChatRoomRepository repository;
- private final Clock clock;
- private final int bufferSize;
+ private final ChatRoomRepository chatRoomRepository;
+ private final MessageRepository messageRepository;
private final ShardingStrategy shardingStrategy;
- private final ChatRoomServiceFactory factory;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux)
{
- chatroomFlux
+ chatRoomInfoFlux
.map(ChatRoomTo::from)
- .subscribe(chatroomTo -> repository.save(chatroomTo));
+ .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo));
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
{
return Flux
- .fromIterable(repository.findAll())
+ .fromIterable(chatRoomRepository.findAll())
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+
+ log.info(
+ "{} - old shard: {}, new shard: {}",
+ chatRoomId,
+ chatRoomTo.getShard(),
+ shard);
+
+ return new ChatRoomInfo(
chatRoomId,
chatRoomTo.getName(),
- shard,
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize);
+ shard);
});
}
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux)
+ {
+ messageFlux
+ .map(message -> MessageTo.from(chatRoomId, message))
+ .subscribe(messageTo -> messageRepository.save(messageTo));
+ }
+
+ @Override
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
+ {
+ return Flux
+ .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString()))
+ .map(messageTo -> messageTo.toMessage());
+ }
}
package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
+import java.util.UUID;
+
@ConditionalOnProperty(
prefix = "chat.backend.inmemory",
return new StorageStrategy()
{
@Override
- public void write(Flux<ChatRoom> chatroomFlux) {}
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
+
+ @Override
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ return Flux.empty();
+ }
+
+ @Override
+ public void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux) {}
@Override
- public Flux<ChatRoom> read()
+ public Flux<Message> readChatRoomData(UUID chatRoomId)
{
return Flux.empty();
}
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
LocalDateTime timeExistingMessage = LocalDateTime.parse(timeExistingMessageAsString);
String textExistingMessage = "Existing";
String textMutatedMessage = "Mutated!";
- ChatRoom chatRoom = new ChatRoom(
- chatroomId,
- "Test-ChatRoom",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
- chatRoomService, 8);
- when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoom));
+ chatRoomService,
+ 8);
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
Long messageId = 66l;
Message.MessageKey key = Message.MessageKey.of(user, messageId);
String textMessage = "Hallo Welt";
- ChatRoom chatRoom = new ChatRoom(
- chatroomId,
- "Test-ChatRoom",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
- chatRoomService, 8);
- when(chatHome.getChatRoom(any(UUID.class)))
- .thenReturn(Mono.just(chatRoom));
+ chatRoomService,
+ 8);
+ when(chatHome.getChatRoomData(any(UUID.class)))
+ .thenReturn(Mono.just(chatRoomData));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
// Needed for readable error-reports, in case of a bug that leads to according unwanted call
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
// When
- Mono<ChatRoom> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetExistingChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
// When
- Mono<ChatRoom> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetNonExistentChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
// When
- Mono<ChatRoom> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ Mono<ChatRoomData> mono = Mono
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.util.UUID;
import static org.mockito.Mockito.*;
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-public class ChatRoomTest
+public class ChatRoomDataTest
{
@Test
@DisplayName("Assert, that Mono emits expected message, if it exists")
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
chatRoomService,
8);
when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.getMessage(user, messageId);
+ Mono<Message> mono = chatRoomData.getMessage(user, messageId);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
chatRoomService,
8);
when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty());
// When
- Mono<Message> mono = chatRoom.getMessage(user, messageId);
+ Mono<Message> mono = chatRoomData.getMessage(user, messageId);
// Then
assertThat(mono).emitsCount(0);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
chatRoomService,
8);
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
chatRoomService,
8);
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
+ ChatRoomData chatRoomData = new ChatRoomData(
Clock.systemDefaultZone(),
chatRoomService,
8);
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, mutatedText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, mutatedText);
// Then
assertThat(mono).sendsError();
int bufferSize = 8;
SimpleChatHome simpleChatHome = new SimpleChatHome(
- getStorageStrategy().read(),
+ getStorageStrategy(),
clock,
bufferSize);
protected void stop()
{
- getStorageStrategy().write(chathome.getChatRooms());
+ getStorageStrategy().write(chathome);
}
@Test
{
start();
- assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+ assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
log.debug("Created chat-room {}", info);
- ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block();
+ ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
-
- assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
- assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom);
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+ assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
assertThat(chathome
- .getChatRoom(chatRoomId)
+ .getChatRoomData(chatRoomId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
stop();
start();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
- assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom);
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info));
+ assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info);
assertThat(chathome
- .getChatRoom(chatRoomId)
+ .getChatRoomData(chatRoomId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
}
{
start();
- assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+ assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0);
UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
log.debug("Created chat-room {}", infoA);
- ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block();
+ ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
log.debug("Created chat-room {}", infoB);
- ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block();
+ ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
- assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA);
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+ assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
assertThat(chathome
- .getChatRoom(chatRoomAId)
+ .getChatRoomData(chatRoomAId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB);
+ assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB);
assertThat(chathome
- .getChatRoom(chatRoomBId)
+ .getChatRoomData(chatRoomBId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
stop();
start();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
- assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA);
+ assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB));
+ assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA);
assertThat(chathome
- .getChatRoom(chatRoomAId)
+ .getChatRoomData(chatRoomAId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB);
+ assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB);
assertThat(chathome
- .getChatRoom(chatRoomBId)
+ .getChatRoomData(chatRoomBId)
.flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
}
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
storageStrategy = new FilesStorageStrategy(
path,
- clock,
- 8,
chatRoomId -> 0,
- messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository;
+import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository;
import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
@Autowired
MongoDbStorageStrategy storageStrategy;
@Autowired
- ChatRoomRepository repository;
+ ChatRoomRepository chatRoomRepository;
@Autowired
- Clock clock;
+ MessageRepository messageRepository;
public InMemoryWithMongoDbStorageIT()
@Bean
MongoDbStorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
+ MessageRepository messageRepository,
Clock clock)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
- clock,
- 8,
- chatRoomId -> 0,
- messageFlux -> new InMemoryChatRoomService(messageFlux));
+ messageRepository,
+ chatRoomId -> 0);
}
@Bean
{
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log);
CONTAINER.followOutput(logConsumer);
- repository.deleteAll();
+ chatRoomRepository.deleteAll();
+ messageRepository.deleteAll();
}
}
.of(ownedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(
shard,
- storageStrategy.read(),
+ storageStrategy,
clock,
bufferSize()));
{
return new FilesStorageStrategy(
Paths.get("target", "test-classes", "data", "files"),
- clock,
- bufferSize(),
new KafkaLikeShardingStrategy(NUM_SHARDS),
- messageFlux -> new InMemoryChatRoomService(messageFlux),
new ObjectMapper());
}
Clock clock)
{
return new SimpleChatHome(
- storageStrategy.read(),
+ storageStrategy,
clock,
bufferSize());
}
{
return new FilesStorageStrategy(
Paths.get("target", "test-classes", "data", "files"),
- clock,
- bufferSize(),
chatRoomId -> 0,
- messageFlux -> new InMemoryChatRoomService(messageFlux),
new ObjectMapper());
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import org.junit.jupiter.api.Test;
-
-import java.time.LocalDateTime;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-public class MessageToTest
-{
- @Test
- void testFrom()
- {
- Message message = new Message(
- Message.MessageKey.of("ute", 1l),
- 6l,
- LocalDateTime.now(),
- "foo");
- MessageTo messageTo = MessageTo.from(message);
- assertThat(messageTo.getId()).isEqualTo("ute--1");
- }
-
- @Test
- void testToMessage()
- {
- MessageTo messageTo = new MessageTo(
- "ute--1",
- 6l,
- LocalDateTime.now().toString(),
- "foo");
- Message message = messageTo.toMessage();
- assertThat(message.getId()).isEqualTo(1l);
- assertThat(message.getUsername()).isEqualTo("ute");
- }
-}
#!/bin/bash
mongoimport --collection=chatRoomTo --db=test /docker-entrypoint-initdb.d/chatRoomTo.json
+mongoimport --collection=messageTo --db=test /docker-entrypoint-initdb.d/messageTo.json
{
"_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "shard": 0,
"name": "FOO",
- "messages": [
- {
- "_id": "peter--1",
- "serial": 0,
- "time": "2023-01-13T20:43:16.803382151",
- "text": "Hallo, ich heiße Peter!"
- },
- {
- "_id": "ute--1",
- "serial": 1,
- "time": "2023-01-13T20:43:16.804049969",
- "text": "Ich bin Ute..."
- },
- {
- "_id": "peter--2",
- "serial": 2,
- "time": "2023-01-13T20:43:16.804092782",
- "text": "Willst du mit mir gehen?"
- },
- {
- "_id": "klaus--1",
- "serial": 3,
- "time": "2023-01-13T20:43:16.804122604",
- "text": "Ja? Nein? Vielleicht??"
- }
- ],
"_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomTo"
}
--- /dev/null
+{
+ "_id": "64f7952cecf06d750cad4b9c",
+ "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "user": "peter",
+ "id": 1,
+ "serial": 0,
+ "time": "2023-01-13T20:43:16.803382151",
+ "text": "Hallo, ich heiße Peter!",
+ "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+ "_id": "64f7952cecf06d750cad4b9d",
+ "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "user": "ute",
+ "id": 1,
+ "serial": 1,
+ "time": "2023-01-13T20:43:16.804049969",
+ "text": "Ich bin Ute...",
+ "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+ "_id": "64f7952cecf06d750cad4b9e",
+ "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "user": "peter",
+ "id": 2,
+ "serial": 2,
+ "time": "2023-01-13T20:43:16.804092782",
+ "text": "Willst du mit mir gehen?",
+ "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}
+{
+ "_id": "64f7953ddf7a9063e0b1f7dc",
+ "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+ "user": "klaus",
+ "id": 1,
+ "serial": 3,
+ "time": "2023-01-13T20:43:16.804122604",
+ "text": "Ja? Nein? Vielleicht??",
+ "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo"
+}