@Slf4j
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
- private Consumer<String, MessageTo>
+ private final Consumer<String, MessageTo> consumer;
+ private final String topic;
private final long[] offsets;
private final Map<UUID, ChatRoom>[] chatrooms;
- public KafkaChatHomeService(int numShards)
+ public KafkaChatHomeService(
+ Consumer<String, MessageTo> consumer,
+ String topic,
+ int numShards)
{
log.debug("Creating KafkaChatHomeService");
- this.chatrooms = new Map[numShards];
+ this.consumer = consumer;
+ this.topic = topic;
this.offsets = new long[numShards];
for (int i=0; i< numShards; i++)
this.offsets[i] = 0l;
+ this.chatrooms = new Map[numShards];
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- log.info("Assigned partitions: {}", partitions);
+ consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
+ {
+ if (!tp.topic().equals(topic))
+ {
+ log.warn("Ignoring unwanted TopicPartition", tp);
+ return;
+ }
+
+ int partition = tp.partition();
+ long unseenOffset = offsets[partition];
+
+ log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ });
}
@Override