- Dropped `ChatHomeService.putChatRoom(ChatRoom)`.
- `ChatRoomFactory.create(UUID, String)` returns `ChatRoomInfo`.
@PostMapping("create")
- public Mono<ChatRoomTo> create(@RequestBody String name)
+ public Mono<ChatRoomInfoTo> create(@RequestBody String name)
{
UUID chatRoomId = UUID.randomUUID();
return factory
.createChatRoom(chatRoomId, name)
- .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom))
- .map(ChatRoomTo::from);
+ .map(ChatRoomInfoTo::from);
}
@GetMapping("list")
- public Flux<ChatRoomTo> list()
+ public Flux<ChatRoomInfoTo> list()
{
return chatHome
.getChatRooms()
- .map(chatroom -> ChatRoomTo.from(chatroom));
+ .map(chatroom -> ChatRoomInfoTo.from(chatroom));
}
@GetMapping("{chatroomId}/list")
}
@GetMapping("{chatroomId}")
- public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
+ public Mono<ChatRoomInfoTo> get(@PathVariable UUID chatroomId)
{
return chatHome
.getChatRoom(chatroomId)
- .map(chatroom -> ChatRoomTo.from(chatroom));
+ .map(chatroom -> ChatRoomInfoTo.from(chatroom));
}
@PutMapping("{chatroomId}/{username}/{messageId}")
--- /dev/null
+package de.juplo.kafka.chat.backend.api;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+@Data
+public class ChatRoomInfoTo
+{
+ private UUID id;
+ private String name;
+ private int shard;
+
+
+ public static ChatRoomInfoTo from(ChatRoomInfo info)
+ {
+ ChatRoomInfoTo to = new ChatRoomInfoTo();
+ to.id = info.getId();
+ to.name = info.getName();
+ to.shard = info.getShard();
+ return to;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.api;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import lombok.Data;
-
-import java.util.UUID;
-
-@Data
-public class ChatRoomTo
-{
- private UUID id;
- private String name;
- private int shard;
-
-
- public static ChatRoomTo from(ChatRoom chatroom)
- {
- ChatRoomTo to = new ChatRoomTo();
- to.id = chatroom.getId();
- to.name = chatroom.getName();
- to.shard = chatroom.getShard();
- return to;
- }
-}
public interface ChatHome
{
- Mono<ChatRoom> putChatRoom(ChatRoom chatRoom);
-
Mono<ChatRoom> getChatRoom(UUID id);
Flux<ChatRoom> getChatRooms();
public interface ChatHomeService
{
- Mono<ChatRoom> putChatRoom(ChatRoom chatRoom);
Mono<ChatRoom> getChatRoom(int shard, UUID id);
Flux<ChatRoom> getChatRooms(int shard);
}
package de.juplo.kafka.chat.backend.domain;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
-@EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "name" })
-public class ChatRoom
+public class ChatRoom extends ChatRoomInfo
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
- @Getter
- private final UUID id;
- @Getter
- private final String name;
- @Getter
- private final int shard;
private final Clock clock;
private final ChatRoomService service;
private final int bufferSize;
ChatRoomService service,
int bufferSize)
{
+ super(id, name, shard);
log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
- this.id = id;
- this.name = name;
- this.shard = shard;
this.clock = clock;
this.service = service;
this.bufferSize = bufferSize;
public interface ChatRoomFactory
{
- Mono<ChatRoom> createChatRoom(UUID id, String name);
+ Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@EqualsAndHashCode(of = { "id" })
+@ToString(of = { "id", "name", "shard" })
+public class ChatRoomInfo
+{
+ @Getter
+ private final UUID id;
+ @Getter
+ private final String name;
+ @Getter
+ private final int shard;
+}
}
- @Override
- public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
- {
- return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
- }
-
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
}
- @Override
- public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
- {
- return service.putChatRoom(chatRoom);
- }
-
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
.forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
- @Override
- public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+ public void putChatRoom(ChatRoom chatRoom)
{
chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
- return Mono.just(chatRoom);
}
@Override
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class InMemoryChatRoomFactory implements ChatRoomFactory
{
+ private final InMemoryChatHomeService chatHomeService;
private final ShardingStrategy shardingStrategy;
private final Clock clock;
private final int bufferSize;
@Override
- public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
int shard = shardingStrategy.selectShard(id);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ chatHomeService.putChatRoom(chatRoom);
+ return Mono.just(chatRoom);
}
}
@Bean
InMemoryChatRoomFactory chatRoomFactory(
+ InMemoryChatHomeService service,
ShardingStrategy strategy,
Clock clock,
ChatBackendProperties properties)
{
return new InMemoryChatRoomFactory(
+ service,
strategy,
clock,
properties.getChatroomBufferSize());
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.api.ChatRoomTo;
+import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
{
try
{
- ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom);
- generator.writeObject(chatroomTo);
- writeMessages(chatroomTo, chatroom.getMessages());
+ ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom);
+ generator.writeObject(infoTo);
+ writeMessages(infoTo, chatroom.getMessages());
}
catch (IOException e)
{
@Override
public Flux<ChatRoom> read()
{
- JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class);
+ JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class);
return Flux
- .from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
+ .from(new JsonFilePublisher<ChatRoomInfoTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo ->
+ .map(infoTo ->
{
- UUID chatRoomId = chatRoomTo.getId();
+ UUID chatRoomId = infoTo.getId();
int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
+ infoTo.getId(),
+ infoTo.getName(),
shard,
clock,
- factory.create(readMessages(chatRoomTo)),
+ factory.create(readMessages(infoTo)),
bufferSize);
});
}
- public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
+ public void writeMessages(ChatRoomInfoTo infoTo, Flux<Message> messageFlux)
{
- Path path = chatroomPath(chatroomTo);
- log.info("Writing messages for {} to {}", chatroomTo, path);
+ Path path = chatroomPath(infoTo);
+ log.info("Writing messages for {} to {}", infoTo, path);
try
{
Files.createDirectories(storagePath);
}
}
- public Flux<Message> readMessages(ChatRoomTo chatroomTo)
+ public Flux<Message> readMessages(ChatRoomInfoTo infoTo)
{
JavaType type = mapper.getTypeFactory().constructType(MessageTo.class);
return Flux
- .from(new JsonFilePublisher<MessageTo>(chatroomPath(chatroomTo), mapper, type))
+ .from(new JsonFilePublisher<MessageTo>(chatroomPath(infoTo), mapper, type))
.log()
.map(MessageTo::toMessage);
}
return storagePath.resolve(Path.of(CHATROOMS_FILENAME));
}
- Path chatroomPath(ChatRoomTo chatroomTo)
+ Path chatroomPath(ChatRoomInfoTo infoTo)
{
- return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json"));
+ return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json"));
}
}
getStorageStrategy().read());
InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory(
+ chatHomeService,
chatRoomId -> 0,
clock,
8);
assertThat(chathome.getChatRooms().toStream()).hasSize(0);
UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
- ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block();
- chathome.putChatRoom(chatroom);
+ ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block();
+ log.debug("Created chat-room {}", info);
+ ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block();
Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block();
Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
assertThat(chathome.getChatRooms().toStream()).hasSize(0);
UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
- ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block();
- chathome.putChatRoom(chatroomA);
+ ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block();
+ log.debug("Created chat-room {}", infoA);
+ ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block();
Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block();
Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block();
Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block();
Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block();
UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea");
- ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block();
- chathome.putChatRoom(chatroomB);
+ ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block();
+ log.debug("Created chat-room {}", infoB);
+ ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block();
Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block();
Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block();
Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();