long unseenOffset = offsets[partition];
log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
- ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset);
- consumer.seek(tp, unseenOffset);
+ handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
});
}
@RequiredArgsConstructor
class NoOpMessageHandler implements MessageHandler
{
- private final int partition;
+ private final TopicPartition tp;
@Override
public MessageHandler handleMessage(Message message)
{
- log.warn("Not handling message {} for partition {}", message, partition);
+ log.warn("Not handling message {} for partition {}", message, tp);
return this;
}
}
- @RequiredArgsConstructor
- class ChatRoomLoader implements MessageHandler
+ class ChatRoomLoadingMessageHandler implements MessageHandler
{
- private final int partition;
+ private final TopicPartition tp;
private final long currentOffset;
private final long unseenOffset;
+ ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+ {
+ this.tp = tp;
+ this.currentOffset = currentOffset;
+ this.unseenOffset = unseenOffset;
+
+ consumer.seek(tp, unseenOffset);
+ }
@Override
public MessageHandler handleMessage(Message message)
return this;
}
}
+
+ @RequiredArgsConstructor
+ class DefaultMessageHandler implements MessageHandler
+ {
+ private final TopicPartition tp;
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ chatrooms[tp.partition()].put()
+ return this;
+ }
+ }
}