for (int i=0; i< numShards; i++)
{
this.offsets[i] = 0l;
- this.handlers[i] = new NoOpMessageHandler(i);
+ this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
}
this.chatrooms = new Map[numShards];
}
{
if (!tp.topic().equals(topic))
{
- log.warn("Ignoring unwanted TopicPartition", tp);
+ log.warn("Ignoring partition from unwanted topic: {}", tp);
return;
}
long unseenOffset = offsets[partition];
log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
- handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+ handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
});
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ partitions.forEach(tp ->
+ {
+ if (!tp.topic().equals(topic))
+ {
+ log.warn("Ignoring partition from unwanted topic: {}", tp);
+ return;
+ }
+
+ int partition = tp.partition();
+ long unseenOffset = offsets[partition];
+
+ log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ });
log.info("Revoked partitions: {}", partitions);
}
}
- interface MessageHandler
+ class MessageHandler
+ {
+
+ }
+
+ interface MessageHandlingStrategy
{
- MessageHandler handleMessage(Message message);
+ MessageHandlingStrategy handleMessage(Message message);
}
@RequiredArgsConstructor
- class NoOpMessageHandler implements MessageHandler
+ class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
{
private final TopicPartition tp;
@Override
- public MessageHandler handleMessage(Message message)
+ public MessageHandlingStrategy handleMessage(Message message)
{
log.warn("Not handling message {} for partition {}", message, tp);
return this;
}
}
- class ChatRoomLoadingMessageHandler implements MessageHandler
+ class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
{
private final TopicPartition tp;
private final long currentOffset;
private final long unseenOffset;
- ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+ ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
{
this.tp = tp;
this.currentOffset = currentOffset;
}
@Override
- public MessageHandler handleMessage(Message message)
+ public MessageHandlingStrategy handleMessage(Message message)
{
// todo
return this;
}
@RequiredArgsConstructor
- class DefaultMessageHandler implements MessageHandler
+ class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
{
private final TopicPartition tp;
@Override
- public MessageHandler handleMessage(Message message)
+ public MessageHandlingStrategy handleMessage(Message message)
{
chatrooms[tp.partition()].put()
return this;