package de.juplo.kafka.chat.backend.domain;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
int[] ownedShards,
Flux<ChatRoom> chatroomFlux)
{
- log.debug("Creating ChatHomeService");
+ log.debug("Creating InMemoryChatHomeService");
this.chatrooms = new Map[numShards];
Set<Integer> owned = Arrays
.stream(ownedShards)
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.UUID;
+import java.util.*;
+@Slf4j
public class KafkaChatHomeService implements ChatHomeService
{
+ private final Map<UUID, ChatRoom>[] chatrooms;
+
+
+ public KafkaChatHomeService(
+ int numShards,
+ int[] ownedShards,
+ Flux<ChatRoom> chatroomFlux)
+ {
+ log.debug("Creating ChatHomeService");
+ this.chatrooms = new Map[numShards];
+ Set<Integer> owned = Arrays
+ .stream(ownedShards)
+ .collect(
+ () -> new HashSet<>(),
+ (set, i) -> set.add(i),
+ (a, b) -> a.addAll(b));
+ for (int shard = 0; shard < numShards; shard++)
+ {
+ chatrooms[shard] = owned.contains(shard)
+ ? new HashMap<>()
+ : null;
+ }
+ chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (owned.contains(chatRoom.getShard()))
+ {
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring not owned chat-room {}", chatRoom);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+ }
+
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
- return null;
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ return Mono.just(chatRoom);
}
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
- return null;
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
}
@Override
public Flux<ChatRoom> getChatRooms(int shard)
{
- return null;
+ return Flux.fromStream(chatrooms[shard].values().stream());
}
}
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.LinkedHashMap;
+@Slf4j
public class KafkaChatRoomService implements ChatRoomService
{
- private final LinkedHashMap<Message.MessageKey, Message> messages = null;
+ private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+ public KafkaChatRoomService(Flux<Message> messageFlux)
+ {
+ log.debug("Creating KafkaChatRoomService");
+ messages = new LinkedHashMap<>();
+ messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+ }
@Override
- public Message persistMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
+ public Message persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
{
- return null;
+ Message message = new Message(key, (long)messages.size(), timestamp, text);
+ messages.put(message.getKey(), message);
+ return message;
}
@Override
public Mono<Message> getMessage(Message.MessageKey key)
{
- return null;
+ return Mono.fromSupplier(() -> messages.get(key));
}
@Override
public Flux<Message> getMessages(long first, long last)
{
- return null;
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
}
}