package de.juplo.kafka.chat.backend.api;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
import org.springframework.http.codec.ServerSentEvent;
}
private Mono<MessageTo> put(
- ChatRoom chatroom,
+ ChatRoomData chatroom,
String username,
Long messageId,
String text)
}
private Mono<MessageTo> get(
- ChatRoom chatroom,
+ ChatRoomData chatroom,
String username,
Long messageId)
{
.flatMapMany(chatroom -> listen(chatroom));
}
- private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
+ private Flux<ServerSentEvent<MessageTo>> listen(ChatRoomData chatroom)
{
return chatroom
.listen()
{
Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
- Mono<ChatRoom> getChatRoom(UUID id);
+ Mono<ChatRoomData> getChatRoom(UUID id);
- Flux<ChatRoom> getChatRooms();
+ Flux<ChatRoomData> getChatRooms();
}
@Slf4j
-public class ChatRoom
+public class ChatRoomData
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
private Sinks.Many<Message> sink;
- public ChatRoom(
+ public ChatRoomData(
Clock clock,
ChatRoomService service,
int bufferSize)
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- void write(Flux<ChatRoom> chatroomFlux);
- Flux<ChatRoom> read();
+ void write(Flux<ChatRoomData> chatroomFlux);
+ Flux<ChatRoomData> read();
}
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomData> getChatRoom(UUID id)
{
int shard = selectShard(id);
return chatHomes[shard] == null
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomData> getChatRooms()
{
return Flux
.fromIterable(ownedShards)
public class SimpleChatHome implements ChatHome
{
private final Integer shard;
- private final Map<UUID, ChatRoom> chatRooms;
+ private final Map<UUID, ChatRoomData> chatRooms;
private final Clock clock;
private final int bufferSize;
public SimpleChatHome(
- Flux<ChatRoom> chatroomFlux,
+ Flux<ChatRoomData> chatroomFlux,
Clock clock,
int bufferSize)
{
public SimpleChatHome(
Integer shard,
- Flux<ChatRoom> chatroomFlux,
+ Flux<ChatRoomData> chatroomFlux,
Clock clock,
int bufferSize)
{
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatRooms.put(id, chatRoom);
- return Mono.just(chatRoom);
+ ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize);
+ chatRooms.put(id, chatRoomData);
+ return Mono.just(chatRoomData);
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomData> getChatRoom(UUID id)
{
return Mono
.justOrEmpty(chatRooms.get(id))
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomData> getChatRooms()
{
return Flux.fromIterable(chatRooms.values());
}
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Map<UUID, ChatRoomData>[] chatrooms;
private boolean running;
@Getter
{
log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
chatRoomId,
createChatRoomRequestTo.getName(),
partition,
clock,
service,
bufferSize);
- putChatRoom(chatRoom);
+ putChatRoom(chatRoomData);
}
int shard = chatRoomInfo.getShard();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatRoomService service = new KafkaChatRoomService(this, id);
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- putChatRoom(chatRoom);
+ ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize);
+ putChatRoom(chatRoomData);
}
private void loadChatMessage(
Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
- ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ ChatRoomData chatRoomData = chatrooms[partition].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
+ (KafkaChatRoomService) chatRoomData.getChatRoomService();
kafkaChatRoomService.persistMessage(message);
}
}
- private void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(ChatRoomData chatRoomData)
{
- Integer partition = chatRoom.getShard();
- UUID chatRoomId = chatRoom.getId();
+ Integer partition = chatRoomData.getShard();
+ UUID chatRoomId = chatRoomData.getId();
if (chatrooms[partition].containsKey(chatRoomId))
{
- log.warn("Ignoring existing chat-room: " + chatRoom);
+ log.warn("Ignoring existing chat-room: " + chatRoomData);
}
else
{
log.info(
"Adding new chat-room to partition {}: {}",
partition,
- chatRoom);
+ chatRoomData);
- chatrooms[partition].put(chatRoomId, chatRoom);
+ chatrooms[partition].put(chatRoomId, chatRoomData);
}
}
.toArray();
}
- Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ Mono<ChatRoomData> getChatRoom(int shard, UUID id)
{
if (loadInProgress)
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
- Flux<ChatRoom> getChatRooms()
+ Flux<ChatRoomData> getChatRooms()
{
return Flux
.fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
}
@Override
- public Mono<ChatRoom> getChatRoom(UUID id)
+ public Mono<ChatRoomData> getChatRoom(UUID id)
{
int shard = selectShard(id);
return chatRoomChannel
}
@Override
- public Flux<ChatRoom> getChatRooms()
+ public Flux<ChatRoomData> getChatRooms()
{
return chatRoomChannel.getChatRooms();
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void write(Flux<ChatRoomData> chatroomFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomData> read()
{
JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
{
UUID chatRoomId = infoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+ return new ChatRoomData(
infoTo.getId(),
infoTo.getName(),
shard,
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
private String name;
private List<MessageTo> messages;
- public static ChatRoomTo from(ChatRoom chatroom)
+ public static ChatRoomTo from(
+ ChatRoomInfo chatRoomInfo,
+ ChatRoomData chatroom)
{
return new ChatRoomTo(
chatroom.getId().toString(),
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
import lombok.RequiredArgsConstructor;
@Override
- public void write(Flux<ChatRoom> chatroomFlux)
+ public void write(Flux<ChatRoomData> chatroomFlux)
{
chatroomFlux
.map(ChatRoomTo::from)
}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomData> read()
{
return Flux
.fromIterable(repository.findAll())
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+ return new ChatRoomData(
chatRoomId,
chatRoomTo.getName(),
shard,
package de.juplo.kafka.chat.backend.persistence.storage.nostorage;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
return new StorageStrategy()
{
@Override
- public void write(Flux<ChatRoom> chatroomFlux) {}
+ public void write(Flux<ChatRoomData> chatroomFlux) {}
@Override
- public Flux<ChatRoom> read()
+ public Flux<ChatRoomData> read()
{
return Flux.empty();
}
LocalDateTime timeExistingMessage = LocalDateTime.parse(timeExistingMessageAsString);
String textExistingMessage = "Existing";
String textMutatedMessage = "Mutated!";
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
chatroomId,
"Test-ChatRoom",
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
- when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoom));
+ when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoomData));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
Long messageId = 66l;
Message.MessageKey key = Message.MessageKey.of(user, messageId);
String textMessage = "Hallo Welt";
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
chatroomId,
"Test-ChatRoom",
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
when(chatHome.getChatRoom(any(UUID.class)))
- .thenReturn(Mono.just(chatRoom));
+ .thenReturn(Mono.just(chatRoomData));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
// Needed for readable error-reports, in case of a bug that leads to according unwanted call
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
// When
- Mono<ChatRoom> mono = Mono
+ Mono<ChatRoomData> mono = Mono
.defer(() -> chatHome.getChatRoom(chatRoomId))
.log("testGetExistingChatroom")
.retryWhen(Retry
UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
// When
- Mono<ChatRoom> mono = Mono
+ Mono<ChatRoomData> mono = Mono
.defer(() -> chatHome.getChatRoom(chatRoomId))
.log("testGetNonExistentChatroom")
.retryWhen(Retry
UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
// When
- Mono<ChatRoom> mono = Mono
+ Mono<ChatRoomData> mono = Mono
.defer(() -> chatHome.getChatRoom(chatRoomId))
.log("testGetChatroomForNotOwnedShard")
.retryWhen(Retry
import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-public class ChatRoomTest
+public class ChatRoomDataTest
{
@Test
@DisplayName("Assert, that Mono emits expected message, if it exists")
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
UUID.randomUUID(),
"Foo",
0,
when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.getMessage(user, messageId);
+ Mono<Message> mono = chatRoomData.getMessage(user, messageId);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
UUID.randomUUID(),
"Foo",
0,
when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty());
// When
- Mono<Message> mono = chatRoom.getMessage(user, messageId);
+ Mono<Message> mono = chatRoomData.getMessage(user, messageId);
// Then
assertThat(mono).emitsCount(0);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
UUID.randomUUID(),
"Foo",
0,
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
UUID.randomUUID(),
"Foo",
0,
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, messageText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, messageText);
// Then
assertThat(mono).emitsExactly(message);
String user = "foo";
Long messageId = 1l;
ChatRoomService chatRoomService = mock(ChatRoomService.class);
- ChatRoom chatRoom = new ChatRoom(
+ ChatRoomData chatRoomData = new ChatRoomData(
UUID.randomUUID(),
"Foo",
0,
when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message));
// When
- Mono<Message> mono = chatRoom.addMessage(messageId, user, mutatedText);
+ Mono<Message> mono = chatRoomData.addMessage(messageId, user, mutatedText);
// Then
assertThat(mono).sendsError();
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block();
log.debug("Created chat-room {}", info);
- ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block();
+ ChatRoomData chatroom = chathome.getChatRoom(chatRoomId).block();
Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block();
log.debug("Created chat-room {}", infoA);
- ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block();
+ ChatRoomData chatroomA = chathome.getChatRoom(chatRoomAId).block();
Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block();
log.debug("Created chat-room {}", infoB);
- ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block();
+ ChatRoomData chatroomB = chathome.getChatRoom(chatRoomBId).block();
Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();