: throwable);
}
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRoomData());
- }
-
-
-
private int selectShard(UUID chatroomId)
{
return shardingStrategy.selectShard(chatroomId);
.justOrEmpty(chatRoomData.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
-
- public Flux<ChatRoomData> getChatRoomData()
- {
- return Flux.fromIterable(chatRoomData.values());
- }
}
return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
}
-
- Flux<ChatRoomData> getChatRoomData()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
- }
}
chatRoomChannel.getOwnedShards())));
}
- public Flux<ChatRoomData> getChatRoomData()
- {
- return chatRoomChannel.getChatRoomData();
- }
-
int selectShard(UUID chatRoomId)
{
byte[] serializedKey = chatRoomId.toString().getBytes();
package de.juplo.kafka.chat.backend.implementation.kafka.messages;
-import de.juplo.kafka.chat.backend.domain.Message;
import lombok.*;
-import java.time.LocalDateTime;
-
@Getter
@Setter
}
- public Message toMessage(long offset, LocalDateTime timestamp)
- {
- return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
- }
-
- public static EventChatMessageReceivedTo from(Message message)
- {
- return EventChatMessageReceivedTo.of(
- message.getUsername(),
- message.getId(),
- message.getMessageText());
- }
-
-
public static EventChatMessageReceivedTo of(String user, Long id, String text)
{
EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();