import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
private final Consumer<String, MessageTo> consumer;
private final String topic;
private final long[] offsets;
+ private final MessageHandler[] handlers;
private final Map<UUID, ChatRoom>[] chatrooms;
this.consumer = consumer;
this.topic = topic;
this.offsets = new long[numShards];
+ this.handlers = new MessageHandler[numShards];
for (int i=0; i< numShards; i++)
+ {
this.offsets[i] = 0l;
+ this.handlers[i] = new NoOpMessageHandler(i);
+ }
this.chatrooms = new Map[numShards];
}
long unseenOffset = offsets[partition];
log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset);
consumer.seek(tp, unseenOffset);
});
}
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
+
+
+ interface MessageHandler
+ {
+ MessageHandler handleMessage(Message message);
+ }
+
+
+ @RequiredArgsConstructor
+ class NoOpMessageHandler implements MessageHandler
+ {
+ private final int partition;
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ log.warn("Not handling message {} for partition {}", message, partition);
+ return this;
+ }
+ }
+
+ @RequiredArgsConstructor
+ class ChatRoomLoader implements MessageHandler
+ {
+ private final int partition;
+ private final long currentOffset;
+ private final long unseenOffset;
+
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ // todo
+ return this;
+ }
+ }
}