long unseenOffset = offsets[partition];
 
       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
-      ChatRoomLoader loader = new ChatRoomLoader(partition, currentOffset, unseenOffset);
-      consumer.seek(tp, unseenOffset);
+      handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
     });
   }
 
   @RequiredArgsConstructor
   class NoOpMessageHandler implements MessageHandler
   {
-    private final int partition;
+    private final TopicPartition tp;
 
     @Override
     public MessageHandler handleMessage(Message message)
     {
-      log.warn("Not handling message {} for partition {}", message, partition);
+      log.warn("Not handling message {} for partition {}", message, tp);
       return this;
     }
   }
 
-  @RequiredArgsConstructor
-  class ChatRoomLoader implements MessageHandler
+  class ChatRoomLoadingMessageHandler implements MessageHandler
   {
-    private final int partition;
+    private final TopicPartition tp;
     private final long currentOffset;
     private final long unseenOffset;
 
+    ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+    {
+      this.tp = tp;
+      this.currentOffset = currentOffset;
+      this.unseenOffset = unseenOffset;
+
+      consumer.seek(tp, unseenOffset);
+    }
 
     @Override
     public MessageHandler handleMessage(Message message)
       return this;
     }
   }
+
+  @RequiredArgsConstructor
+  class DefaultMessageHandler implements MessageHandler
+  {
+    private final TopicPartition tp;
+
+    @Override
+    public MessageHandler handleMessage(Message message)
+    {
+      chatrooms[tp.partition()].put()
+      return this;
+    }
+  }
 }