public void onExit()
{
for (int shard = 0; shard < chatHomes.length; shard++)
- storageStrategy.write(chatHomes[shard].getChatRooms());
+ storageStrategy.writeChatRoomData(chatHomes[shard].getChatRoomData());
}
public static void main(String[] args)
public Flux<ChatRoomInfoTo> list()
{
return chatHome
- .getChatRooms()
- .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+ .getChatRoomInfo()
+ .map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo));
}
- @GetMapping("{chatroomId}/list")
- public Flux<MessageTo> list(@PathVariable UUID chatroomId)
+ @GetMapping("{chatRoomId}/list")
+ public Flux<MessageTo> list(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .flatMapMany(chatroom -> chatroom
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(chatRoomData -> chatRoomData
.getMessages()
.map(MessageTo::from));
}
- @GetMapping("{chatroomId}")
- public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatroomId)
+ @GetMapping("{chatRoomId}")
+ public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .map(chatroom -> ChatRoomInfoTo.from(chatroom));
+ .getChatRoomInfo(chatRoomId)
+ .map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo));
}
- @PutMapping("{chatroomId}/{username}/{messageId}")
+ @PutMapping("{chatRoomId}/{username}/{messageId}")
public Mono<MessageTo> put(
- @PathVariable UUID chatroomId,
+ @PathVariable UUID chatRoomId,
@PathVariable String username,
@PathVariable Long messageId,
@RequestBody String text)
{
return
chatHome
- .getChatRoom(chatroomId)
- .flatMap(chatroom -> put(chatroom, username, messageId, text));
+ .getChatRoomData(chatRoomId)
+ .flatMap(chatRoomData -> put(chatRoomData, username, messageId, text));
}
private Mono<MessageTo> put(
- ChatRoomData chatroom,
+ ChatRoomData chatRoomData,
String username,
Long messageId,
String text)
{
return
- chatroom
+ chatRoomData
.addMessage(
messageId,
username,
.map(message -> MessageTo.from(message));
}
- @GetMapping("{chatroomId}/{username}/{messageId}")
+ @GetMapping("{chatRoomId}/{username}/{messageId}")
public Mono<MessageTo> get(
- @PathVariable UUID chatroomId,
+ @PathVariable UUID chatRoomId,
@PathVariable String username,
@PathVariable Long messageId)
{
return
chatHome
- .getChatRoom(chatroomId)
- .flatMap(chatroom -> get(chatroom, username, messageId));
+ .getChatRoomData(chatRoomId)
+ .flatMap(chatRoomData -> get(chatRoomData, username, messageId));
}
private Mono<MessageTo> get(
- ChatRoomData chatroom,
+ ChatRoomData chatRoomData,
String username,
Long messageId)
{
return
- chatroom
+ chatRoomData
.getMessage(username, messageId)
.map(message -> MessageTo.from(message));
}
- @GetMapping(path = "{chatroomId}/listen")
- public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
+ @GetMapping(path = "{chatRoomId}/listen")
+ public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatRoomId)
{
return chatHome
- .getChatRoom(chatroomId)
- .flatMapMany(chatroom -> listen(chatroom));
+ .getChatRoomData(chatRoomId)
+ .flatMapMany(chatRoomData -> listen(chatRoomData));
}
- private Flux<ServerSentEvent<MessageTo>> listen(ChatRoomData chatroom)
+ private Flux<ServerSentEvent<MessageTo>> listen(ChatRoomData chatRoomData)
{
- return chatroom
+ return chatRoomData
.listen()
.log()
.map(message -> MessageTo.from(message))
@PostMapping("/store")
public void store()
{
- storageStrategy.write(chatHome.getChatRooms());
+ storageStrategy.writeChatRoomInfo(chatHome.getChatRoomInfo());
+ storageStrategy.writeChatRoomData(chatHome.getChatRoomData());
}
}
{
Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
- Mono<ChatRoomData> getChatRoom(UUID id);
+ Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
- Flux<ChatRoomData> getChatRooms();
+ Flux<ChatRoomInfo> getChatRoomInfo();
+
+ Mono<ChatRoomData> getChatRoomData(UUID id);
+
+ Flux<ChatRoomData> getChatRoomData();
}
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
- private final Clock clock;
private final ChatRoomService service;
+ private final Clock clock;
private final int bufferSize;
private Sinks.Many<Message> sink;
public ChatRoomData(
- Clock clock,
ChatRoomService service,
+ Clock clock,
int bufferSize)
{
log.info("Created ChatRoom with buffer-size {}", bufferSize);
- this.clock = clock;
this.service = service;
+ this.clock = clock;
this.bufferSize = bufferSize;
// @RequiredArgsConstructor unfortunately not possible, because
// the `bufferSize` is not set, if `createSink()` is called
package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- void write(Flux<ChatRoomData> chatroomFlux);
- Flux<ChatRoomData> read();
+ void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+ Flux<ChatRoomInfo> readChatRoomInfo();
+ void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux);
+ Flux<ChatRoomData> readChatRoomData();
}
Clock clock)
{
return new SimpleChatHome(
- storageStrategy.read(),
+ storageStrategy.readChatRoomData(),
clock,
properties.getChatroomBufferSize());
}
.of(properties.getInmemory().getOwnedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(
shard,
- storageStrategy.read(),
+ storageStrategy.readChatRoomData(),
clock,
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
}
@Override
- public Mono<ChatRoomData> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatHomes[shard] == null
? Mono.error(new ShardNotOwnedException(shard))
: chatHomes[shard]
- .getChatRoom(id)
+ .getChatRoomInfo(id)
.onErrorMap(throwable -> throwable instanceof UnknownChatroomException
? new UnknownChatroomException(
id,
}
@Override
- public Flux<ChatRoomData> getChatRooms()
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
return Flux
.fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
+ .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
}
+ @Override
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard]
+ .getChatRoomData(id)
+ .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+ ? new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+ : throwable);
+ }
+
+ @Override
+ public Flux<ChatRoomData> getChatRoomData()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRoomData());
+ }
+
+
private int selectShard(UUID chatroomId)
{
public class SimpleChatHome implements ChatHome
{
private final Integer shard;
- private final Map<UUID, ChatRoomData> chatRooms;
+ private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+ private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
private final int bufferSize;
public SimpleChatHome(
- Flux<ChatRoomData> chatroomFlux,
+ Flux<ChatRoomInfo> chatRoomInfoFlux,
+ ChatRoomService chatRoomService,
Clock clock,
int bufferSize)
{
- this(null, chatroomFlux, clock, bufferSize);
+ this(
+ null,
+ chatRoomInfoFlux,
+ chatRoomService,
+ clock,
+ bufferSize);
}
public SimpleChatHome(
Integer shard,
- Flux<ChatRoomData> chatroomFlux,
+ Flux<ChatRoomInfo> chatRoomInfoFlux,
+ ChatRoomService chatRoomService,
Clock clock,
int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
;
this.shard = shard;
- this.chatRooms = new HashMap<>();
- chatroomFlux
- .filter(chatRoom ->
+ this.chatRoomInfo = new HashMap<>();
+ this.chatRoomData = new HashMap<>();
+ chatRoomInfoFlux
+ .filter(info ->
{
- if (shard == null || chatRoom.getShard() == shard)
+ if (shard == null || info.getShard() == shard)
{
return true;
}
log.info(
"SimpleChatHome for shard {} ignores not owned chat-room {}",
shard,
- chatRoom);
+ info);
return false;
}
})
.toStream()
- .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+ .forEach(info ->
+ {
+ chatRoomInfo.put(info.getId(), info);
+ chatRoomData.put(
+ info.getId(),
+ new ChatRoomData(chatRoomService, clock, bufferSize));
+ });
this.clock = clock;
this.bufferSize = bufferSize;
}
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize);
- chatRooms.put(id, chatRoomData);
- return Mono.just(chatRoomData);
+ ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize);
+ this.chatRoomData.put(id, chatRoomData);
+ return Mono.just(chatRoomInfo);
+ }
+
+ @Override
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+ {
+ return Mono
+ .justOrEmpty(chatRoomInfo.get(id))
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
+ {
+ return Flux.fromIterable(chatRoomInfo.values());
}
@Override
- public Mono<ChatRoomData> getChatRoom(UUID id)
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
{
return Mono
- .justOrEmpty(chatRooms.get(id))
+ .justOrEmpty(chatRoomData.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
- public Flux<ChatRoomData> getChatRooms()
+ public Flux<ChatRoomData> getChatRoomData()
{
- return Flux.fromIterable(chatRooms.values());
+ return Flux.fromIterable(chatRoomData.values());
}
}
log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
ChatRoomData chatRoomData = new ChatRoomData(
- chatRoomId,
- createChatRoomRequestTo.getName(),
- partition,
clock,
service,
bufferSize);
int shard = chatRoomInfo.getShard();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, id);
- ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
putChatRoom(chatRoomData);
}
}
@Override
- public Mono<ChatRoomData> getChatRoom(UUID id)
+ public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
{
int shard = selectShard(id);
return chatRoomChannel
chatRoomChannel.getOwnedShards())));
}
- int selectShard(UUID chatRoomId)
+ @Override
+ public Flux<ChatRoomInfo> getChatRoomInfo()
{
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ return chatRoomChannel.getChatRooms();
}
@Override
- public Flux<ChatRoomData> getChatRooms()
+ public Mono<ChatRoomData> getChatRoomData(UUID id)
+ {
+ int shard = selectShard(id);
+ return chatRoomChannel
+ .getChatRoom(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+ id,
+ shard,
+ chatRoomChannel.getOwnedShards())));
+ }
+
+ @Override
+ public Flux<ChatRoomData> getChatRoomData()
{
return chatRoomChannel.getChatRooms();
}
+
+ int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
}
@Override
- public void write(Flux<ChatRoomData> chatroomFlux)
+ public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
.getFactory()
.createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING));
- chatroomFlux
+ chatRoomDataFlux
.log()
.doFirst(() ->
{
}
@Override
- public Flux<ChatRoomData> read()
+ public Flux<ChatRoomData> readChatRoomData()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
UUID chatRoomId = infoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoomData(
- infoTo.getId(),
- infoTo.getName(),
- shard,
clock,
factory.create(readMessages(infoTo)),
bufferSize);
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
public static ChatRoomTo from(
ChatRoomInfo chatRoomInfo,
- ChatRoomData chatroom)
+ ChatRoomData chatRoomData)
{
return new ChatRoomTo(
- chatroom.getId().toString(),
- chatroom.getName(),
- chatroom
+ chatRoomInfo.getId().toString(),
+ chatRoomInfo.getName(),
+ chatRoomData
.getMessages()
.map(MessageTo::from)
.collectList()
@Override
- public void write(Flux<ChatRoomData> chatroomFlux)
+ public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux)
{
- chatroomFlux
+ chatRoomDataFlux
.map(ChatRoomTo::from)
.subscribe(chatroomTo -> repository.save(chatroomTo));
}
@Override
- public Flux<ChatRoomData> read()
+ public Flux<ChatRoomData> readChatRoomData()
{
return Flux
.fromIterable(repository.findAll())
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoomData(
- chatRoomId,
- chatRoomTo.getName(),
- shard,
clock,
factory.create(
Flux
package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
return new StorageStrategy()
{
@Override
- public void write(Flux<ChatRoomData> chatroomFlux) {}
+ public void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux) {}
@Override
- public Flux<ChatRoomData> read()
+ public Flux<ChatRoomInfo> readChatRoomInfo()
+ {
+ return Flux.empty();
+ }
+
+ @Override
+ public void writeChatRoomData(Flux<ChatRoomData> chatRoomDataFlux) {}
+
+ @Override
+ public Flux<ChatRoomData> readChatRoomData()
{
return Flux.empty();
}
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId));
// When
WebTestClient.ResponseSpec responseSpec = client
String textExistingMessage = "Existing";
String textMutatedMessage = "Mutated!";
ChatRoomData chatRoomData = new ChatRoomData(
- chatroomId,
- "Test-ChatRoom",
- 0,
Clock.systemDefaultZone(),
- chatRoomService, 8);
- when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
+ chatRoomService,
+ 8);
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
Message.MessageKey key = Message.MessageKey.of(user, messageId);
String textMessage = "Hallo Welt";
ChatRoomData chatRoomData = new ChatRoomData(
- chatroomId,
- "Test-ChatRoom",
- 0,
Clock.systemDefaultZone(),
- chatRoomService, 8);
- when(chatHome.getChatRoom(any(UUID.class)))
+ chatRoomService,
+ 8);
+ when(chatHome.getChatRoomData(any(UUID.class)))
.thenReturn(Mono.just(chatRoomData));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
String username = "foo";
Long messageId = 66l;
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// Given
UUID chatroomId = UUID.randomUUID();
int shard = 666;
- when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
+ when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard));
// When
WebTestClient.ResponseSpec responseSpec = client
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetExistingChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetNonExistentChatroom")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
// When
Mono<ChatRoomData> mono = Mono
- .defer(() -> chatHome.getChatRoom(chatRoomId))
+ .defer(() -> chatHome.getChatRoomData(chatRoomId))
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
.backoff(5, Duration.ofSeconds(1))
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.util.UUID;
import static org.mockito.Mockito.*;
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
ChatRoomData chatRoomData = new ChatRoomData(
- UUID.randomUUID(),
- "Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
ChatRoomData chatRoomData = new ChatRoomData(
- UUID.randomUUID(),
- "Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
ChatRoomData chatRoomData = new ChatRoomData(
- UUID.randomUUID(),
- "Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
ChatRoomData chatRoomData = new ChatRoomData(
- UUID.randomUUID(),
- "Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
ChatRoomData chatRoomData = new ChatRoomData(
- UUID.randomUUID(),
- "Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
int bufferSize = 8;
SimpleChatHome simpleChatHome = new SimpleChatHome(
- getStorageStrategy().read(),
+ getStorageStrategy().readChatRoomData(),
clock,
bufferSize);
protected void stop()
{
- getStorageStrategy().write(chathome.getChatRooms());
+ getStorageStrategy().writeChatRoomData(chathome.getChatRoomData());
}
@Test
{
start();
- assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+ assertThat(chathome.getChatRoomData().toStream()).hasSize(0);
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
log.debug("Created chat-room {}", info);
- ChatRoomData chatroom = chathome.getChatRoom(chatRoomId).block();
+ ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block();
Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
- assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
+ assertThat(chathome.getChatRoomData().toStream()).containsExactlyElementsOf(List.of(chatroom));
+ assertThat(chathome.getChatRoomData(chatroom.getId())).emitsExactly(chatroom);
assertThat(chathome
- .getChatRoom(chatroom.getId())
+ .getChatRoomData(chatroom.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
stop();
start();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom));
- assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom);
+ assertThat(chathome.getChatRoomData().toStream()).containsExactlyElementsOf(List.of(chatroom));
+ assertThat(chathome.getChatRoomData(chatroom.getId())).emitsExactly(chatroom);
assertThat(chathome
- .getChatRoom(chatroom.getId())
+ .getChatRoomData(chatroom.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4);
}
{
start();
- assertThat(chathome.getChatRooms().toStream()).hasSize(0);
+ assertThat(chathome.getChatRoomData().toStream()).hasSize(0);
UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
log.debug("Created chat-room {}", infoA);
- ChatRoomData chatroomA = chathome.getChatRoom(chatRoomAId).block();
+ ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block();
Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
log.debug("Created chat-room {}", infoB);
- ChatRoomData chatroomB = chathome.getChatRoom(chatRoomBId).block();
+ ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block();
Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();
Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
- assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA);
+ assertThat(chathome.getChatRoomData().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
+ assertThat(chathome.getChatRoomData(chatroomA.getId())).emitsExactly(chatroomA);
assertThat(chathome
- .getChatRoom(chatroomA.getId())
+ .getChatRoomData(chatroomA.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB);
+ assertThat(chathome.getChatRoomData(chatroomB.getId())).emitsExactly(chatroomB);
assertThat(chathome
- .getChatRoom(chatroomB.getId())
+ .getChatRoomData(chatroomB.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
stop();
start();
- assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
- assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA);
+ assertThat(chathome.getChatRoomData().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB));
+ assertThat(chathome.getChatRoomData(chatroomA.getId())).emitsExactly(chatroomA);
assertThat(chathome
- .getChatRoom(chatroomA.getId())
+ .getChatRoomData(chatroomA.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4);
- assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB);
+ assertThat(chathome.getChatRoomData(chatroomB.getId())).emitsExactly(chatroomB);
assertThat(chathome
- .getChatRoom(chatroomB.getId())
+ .getChatRoomData(chatroomB.getId())
.flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4);
}
.of(ownedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(
shard,
- storageStrategy.read(),
+ storageStrategy.readChatRoomData(),
clock,
bufferSize()));
Clock clock)
{
return new SimpleChatHome(
- storageStrategy.read(),
+ storageStrategy.readChatRoomData(),
clock,
bufferSize());
}