import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
for (int i=0; i< numShards; i++)
{
this.offsets[i] = 0l;
- this.handlers[i] = new NoOpMessageHandler(i);
+ this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
}
this.chatrooms = new Map[numShards];
}
{
if (!tp.topic().equals(topic))
{
- log.warn("Ignoring unwanted TopicPartition", tp);
+ log.warn("Ignoring partition from unwanted topic: {}", tp);
return;
}
long unseenOffset = offsets[partition];
log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
- handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+ handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
});
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ partitions.forEach(tp ->
+ {
+ if (!tp.topic().equals(topic))
+ {
+ log.warn("Ignoring partition from unwanted topic: {}", tp);
+ return;
+ }
+
+ int partition = tp.partition();
+ long unseenOffset = offsets[partition];
+
+ log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ });
log.info("Revoked partitions: {}", partitions);
}
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
-
-
- interface MessageHandler
- {
- MessageHandler handleMessage(Message message);
- }
-
-
- @RequiredArgsConstructor
- class NoOpMessageHandler implements MessageHandler
- {
- private final TopicPartition tp;
-
- @Override
- public MessageHandler handleMessage(Message message)
- {
- log.warn("Not handling message {} for partition {}", message, tp);
- return this;
- }
- }
-
- class ChatRoomLoadingMessageHandler implements MessageHandler
- {
- private final TopicPartition tp;
- private final long currentOffset;
- private final long unseenOffset;
-
- ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
- {
- this.tp = tp;
- this.currentOffset = currentOffset;
- this.unseenOffset = unseenOffset;
-
- consumer.seek(tp, unseenOffset);
- }
-
- @Override
- public MessageHandler handleMessage(Message message)
- {
- // todo
- return this;
- }
- }
-
- @RequiredArgsConstructor
- class DefaultMessageHandler implements MessageHandler
- {
- private final TopicPartition tp;
-
- @Override
- public MessageHandler handleMessage(Message message)
- {
- chatrooms[tp.partition()].put()
- return this;
- }
- }
}