- return Flux.fromStream(chatrooms[shard].values().stream());
- }
-
-
- interface MessageHandler
- {
- MessageHandler handleMessage(Message message);
- }
-
-
- @RequiredArgsConstructor
- class NoOpMessageHandler implements MessageHandler
- {
- private final TopicPartition tp;
-
- @Override
- public MessageHandler handleMessage(Message message)
- {
- log.warn("Not handling message {} for partition {}", message, tp);
- return this;
- }
- }
-
- class ChatRoomLoadingMessageHandler implements MessageHandler
- {
- private final TopicPartition tp;
- private final long currentOffset;
- private final long unseenOffset;
-
- ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
- {
- this.tp = tp;
- this.currentOffset = currentOffset;
- this.unseenOffset = unseenOffset;
-
- consumer.seek(tp, unseenOffset);
- }
-
- @Override
- public MessageHandler handleMessage(Message message)