-package de.juplo.kafka.chat.backend.persistence.kafka;
+package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
chatRoomId,
partition,
bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
ChatRoomData chatRoomData = new ChatRoomData(
clock,
service,
{
UUID id = chatRoomInfo.getId();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
- KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, id);
ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
putChatRoom(
chatRoomInfo.getId(),
Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoomData.getChatRoomService();
+ KafkaChatMessageService kafkaChatRoomService =
+ (KafkaChatMessageService) chatRoomData.getChatRoomService();
kafkaChatRoomService.persistMessage(message);
}
return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
}
-
- Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
- }
}