@Bean
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
- MessageRepository messageRepository,
- ShardingStrategy shardingStrategy)
+ MessageRepository messageRepository)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
- messageRepository,
- shardingStrategy);
+ messageRepository);
}
}
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.implementation.ShardingStrategy;
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
{
private final ChatRoomRepository chatRoomRepository;
private final MessageRepository messageRepository;
- private final ShardingStrategy shardingStrategy;
@Override
{
return chatRoomInfoFlux
.map(ChatRoomTo::from)
- .map(chatroomTo -> chatRoomRepository.save(chatroomTo))
+ .map(chatRoomTo -> chatRoomRepository.save(chatRoomTo))
.map(ChatRoomTo::toChatRoomInfo);
}
@Override
public Flux<ChatRoomInfo> readChatRoomInfo()
{
- return chatRoomRepository.findAll()
+ return chatRoomRepository
+ .findAll()
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
{
return messageFlux
.map(message -> MessageTo.from(chatRoomId, message))
- .map(messageTo -> messageRepository.save(messageTo))
+ .flatMap(messageTo -> messageRepository.save(messageTo))
.map(MessageTo::toMessage);
}
@Override
public Flux<Message> readChatRoomData(UUID chatRoomId)
{
- return messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())
+ return messageRepository
+ .findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())
.map(messageTo -> messageTo.toMessage());
}
}
@Bean
MongoDbStorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
- MessageRepository messageRepository,
- Clock clock)
+ MessageRepository messageRepository)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
- messageRepository,
- chatRoomId -> 0);
+ messageRepository);
}
@Bean