@ -181,6 +181,67 @@ struct rbd_req_coll {
struct rbd_req_status status [ 0 ] ;
} ;
struct rbd_img_request ;
typedef void ( * rbd_img_callback_t ) ( struct rbd_img_request * ) ;
# define BAD_WHICH U32_MAX /* Good which or bad which, which? */
struct rbd_obj_request ;
typedef void ( * rbd_obj_callback_t ) ( struct rbd_obj_request * ) ;
enum obj_request_type { OBJ_REQUEST_BIO } ; /* More types to come */
struct rbd_obj_request {
const char * object_name ;
u64 offset ; /* object start byte */
u64 length ; /* bytes from offset */
struct rbd_img_request * img_request ;
struct list_head links ; /* img_request->obj_requests */
u32 which ; /* posn image request list */
enum obj_request_type type ;
struct bio * bio_list ;
struct ceph_osd_request * osd_req ;
u64 xferred ; /* bytes transferred */
u64 version ;
s32 result ;
atomic_t done ;
rbd_obj_callback_t callback ;
struct kref kref ;
} ;
struct rbd_img_request {
struct request * rq ;
struct rbd_device * rbd_dev ;
u64 offset ; /* starting image byte offset */
u64 length ; /* byte count from offset */
bool write_request ; /* false for read */
union {
struct ceph_snap_context * snapc ; /* for writes */
u64 snap_id ; /* for reads */
} ;
spinlock_t completion_lock ; /* protects next_completion */
u32 next_completion ;
rbd_img_callback_t callback ;
u32 obj_request_count ;
struct list_head obj_requests ; /* rbd_obj_request structs */
struct kref kref ;
} ;
# define for_each_obj_request(ireq, oreq) \
list_for_each_entry ( oreq , & ireq - > obj_requests , links )
# define for_each_obj_request_from(ireq, oreq) \
list_for_each_entry_from ( oreq , & ireq - > obj_requests , links )
# define for_each_obj_request_safe(ireq, oreq, n) \
list_for_each_entry_safe_reverse ( oreq , n , & ireq - > obj_requests , links )
/*
* a single io request
*/
@ -1031,6 +1092,62 @@ out_err:
return NULL ;
}
static void rbd_obj_request_get ( struct rbd_obj_request * obj_request )
{
kref_get ( & obj_request - > kref ) ;
}
static void rbd_obj_request_destroy ( struct kref * kref ) ;
static void rbd_obj_request_put ( struct rbd_obj_request * obj_request )
{
rbd_assert ( obj_request ! = NULL ) ;
kref_put ( & obj_request - > kref , rbd_obj_request_destroy ) ;
}
static void rbd_img_request_get ( struct rbd_img_request * img_request )
{
kref_get ( & img_request - > kref ) ;
}
static void rbd_img_request_destroy ( struct kref * kref ) ;
static void rbd_img_request_put ( struct rbd_img_request * img_request )
{
rbd_assert ( img_request ! = NULL ) ;
kref_put ( & img_request - > kref , rbd_img_request_destroy ) ;
}
static inline void rbd_img_obj_request_add ( struct rbd_img_request * img_request ,
struct rbd_obj_request * obj_request )
{
rbd_obj_request_get ( obj_request ) ;
obj_request - > img_request = img_request ;
list_add_tail ( & obj_request - > links , & img_request - > obj_requests ) ;
obj_request - > which = img_request - > obj_request_count + + ;
rbd_assert ( obj_request - > which ! = BAD_WHICH ) ;
}
static inline void rbd_img_obj_request_del ( struct rbd_img_request * img_request ,
struct rbd_obj_request * obj_request )
{
rbd_assert ( obj_request - > which ! = BAD_WHICH ) ;
obj_request - > which = BAD_WHICH ;
list_del ( & obj_request - > links ) ;
rbd_assert ( obj_request - > img_request = = img_request ) ;
obj_request - > callback = NULL ;
obj_request - > img_request = NULL ;
rbd_obj_request_put ( obj_request ) ;
}
static bool obj_request_type_valid ( enum obj_request_type type )
{
switch ( type ) {
case OBJ_REQUEST_BIO :
return true ;
default :
return false ;
}
}
struct ceph_osd_req_op * rbd_osd_req_op_create ( u16 opcode , . . . )
{
struct ceph_osd_req_op * op ;
@ -1395,6 +1512,26 @@ done:
return ret ;
}
static int rbd_obj_request_submit ( struct ceph_osd_client * osdc ,
struct rbd_obj_request * obj_request )
{
return ceph_osdc_start_request ( osdc , obj_request - > osd_req , false ) ;
}
static void rbd_img_request_complete ( struct rbd_img_request * img_request )
{
if ( img_request - > callback )
img_request - > callback ( img_request ) ;
else
rbd_img_request_put ( img_request ) ;
}
static void rbd_obj_request_complete ( struct rbd_obj_request * obj_request )
{
if ( obj_request - > callback )
obj_request - > callback ( obj_request ) ;
}
/*
* Request sync osd read
*/
@ -1618,6 +1755,486 @@ static int rbd_dev_do_request(struct request *rq,
return 0 ;
}
static void rbd_osd_read_callback ( struct rbd_obj_request * obj_request ,
struct ceph_osd_op * op )
{
u64 xferred ;
/*
* We support a 64 - bit length , but ultimately it has to be
* passed to blk_end_request ( ) , which takes an unsigned int .
*/
xferred = le64_to_cpu ( op - > extent . length ) ;
rbd_assert ( xferred < ( u64 ) UINT_MAX ) ;
if ( obj_request - > result = = ( s32 ) - ENOENT ) {
zero_bio_chain ( obj_request - > bio_list , 0 ) ;
obj_request - > result = 0 ;
} else if ( xferred < obj_request - > length & & ! obj_request - > result ) {
zero_bio_chain ( obj_request - > bio_list , xferred ) ;
xferred = obj_request - > length ;
}
obj_request - > xferred = xferred ;
atomic_set ( & obj_request - > done , 1 ) ;
}
static void rbd_osd_write_callback ( struct rbd_obj_request * obj_request ,
struct ceph_osd_op * op )
{
obj_request - > xferred = le64_to_cpu ( op - > extent . length ) ;
atomic_set ( & obj_request - > done , 1 ) ;
}
static void rbd_osd_req_callback ( struct ceph_osd_request * osd_req ,
struct ceph_msg * msg )
{
struct rbd_obj_request * obj_request = osd_req - > r_priv ;
struct ceph_osd_reply_head * reply_head ;
struct ceph_osd_op * op ;
u32 num_ops ;
u16 opcode ;
rbd_assert ( osd_req = = obj_request - > osd_req ) ;
rbd_assert ( ! ! obj_request - > img_request ^
( obj_request - > which = = BAD_WHICH ) ) ;
obj_request - > xferred = le32_to_cpu ( msg - > hdr . data_len ) ;
reply_head = msg - > front . iov_base ;
obj_request - > result = ( s32 ) le32_to_cpu ( reply_head - > result ) ;
obj_request - > version = le64_to_cpu ( osd_req - > r_reassert_version . version ) ;
num_ops = le32_to_cpu ( reply_head - > num_ops ) ;
WARN_ON ( num_ops ! = 1 ) ; /* For now */
op = & reply_head - > ops [ 0 ] ;
opcode = le16_to_cpu ( op - > op ) ;
switch ( opcode ) {
case CEPH_OSD_OP_READ :
rbd_osd_read_callback ( obj_request , op ) ;
break ;
case CEPH_OSD_OP_WRITE :
rbd_osd_write_callback ( obj_request , op ) ;
break ;
default :
rbd_warn ( NULL , " %s: unsupported op %hu \n " ,
obj_request - > object_name , ( unsigned short ) opcode ) ;
break ;
}
if ( atomic_read ( & obj_request - > done ) )
rbd_obj_request_complete ( obj_request ) ;
}
static struct ceph_osd_request * rbd_osd_req_create (
struct rbd_device * rbd_dev ,
bool write_request ,
struct rbd_obj_request * obj_request ,
struct ceph_osd_req_op * op )
{
struct rbd_img_request * img_request = obj_request - > img_request ;
struct ceph_snap_context * snapc = NULL ;
struct ceph_osd_client * osdc ;
struct ceph_osd_request * osd_req ;
struct timespec now ;
struct timespec * mtime ;
u64 snap_id = CEPH_NOSNAP ;
u64 offset = obj_request - > offset ;
u64 length = obj_request - > length ;
if ( img_request ) {
rbd_assert ( img_request - > write_request = = write_request ) ;
if ( img_request - > write_request )
snapc = img_request - > snapc ;
else
snap_id = img_request - > snap_id ;
}
/* Allocate and initialize the request, for the single op */
osdc = & rbd_dev - > rbd_client - > client - > osdc ;
osd_req = ceph_osdc_alloc_request ( osdc , snapc , 1 , false , GFP_ATOMIC ) ;
if ( ! osd_req )
return NULL ; /* ENOMEM */
rbd_assert ( obj_request_type_valid ( obj_request - > type ) ) ;
switch ( obj_request - > type ) {
case OBJ_REQUEST_BIO :
rbd_assert ( obj_request - > bio_list ! = NULL ) ;
osd_req - > r_bio = obj_request - > bio_list ;
bio_get ( osd_req - > r_bio ) ;
/* osd client requires "num pages" even for bio */
osd_req - > r_num_pages = calc_pages_for ( offset , length ) ;
break ;
}
if ( write_request ) {
osd_req - > r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK ;
now = CURRENT_TIME ;
mtime = & now ;
} else {
osd_req - > r_flags = CEPH_OSD_FLAG_READ ;
mtime = NULL ; /* not needed for reads */
offset = 0 ; /* These are not used... */
length = 0 ; /* ...for osd read requests */
}
osd_req - > r_callback = rbd_osd_req_callback ;
osd_req - > r_priv = obj_request ;
osd_req - > r_oid_len = strlen ( obj_request - > object_name ) ;
rbd_assert ( osd_req - > r_oid_len < sizeof ( osd_req - > r_oid ) ) ;
memcpy ( osd_req - > r_oid , obj_request - > object_name , osd_req - > r_oid_len ) ;
osd_req - > r_file_layout = rbd_dev - > layout ; /* struct */
/* osd_req will get its own reference to snapc (if non-null) */
ceph_osdc_build_request ( osd_req , offset , length , 1 , op ,
snapc , snap_id , mtime ) ;
return osd_req ;
}
static void rbd_osd_req_destroy ( struct ceph_osd_request * osd_req )
{
ceph_osdc_put_request ( osd_req ) ;
}
/* object_name is assumed to be a non-null pointer and NUL-terminated */
static struct rbd_obj_request * rbd_obj_request_create ( const char * object_name ,
u64 offset , u64 length ,
enum obj_request_type type )
{
struct rbd_obj_request * obj_request ;
size_t size ;
char * name ;
rbd_assert ( obj_request_type_valid ( type ) ) ;
size = strlen ( object_name ) + 1 ;
obj_request = kzalloc ( sizeof ( * obj_request ) + size , GFP_KERNEL ) ;
if ( ! obj_request )
return NULL ;
name = ( char * ) ( obj_request + 1 ) ;
obj_request - > object_name = memcpy ( name , object_name , size ) ;
obj_request - > offset = offset ;
obj_request - > length = length ;
obj_request - > which = BAD_WHICH ;
obj_request - > type = type ;
INIT_LIST_HEAD ( & obj_request - > links ) ;
atomic_set ( & obj_request - > done , 0 ) ;
kref_init ( & obj_request - > kref ) ;
return obj_request ;
}
static void rbd_obj_request_destroy ( struct kref * kref )
{
struct rbd_obj_request * obj_request ;
obj_request = container_of ( kref , struct rbd_obj_request , kref ) ;
rbd_assert ( obj_request - > img_request = = NULL ) ;
rbd_assert ( obj_request - > which = = BAD_WHICH ) ;
if ( obj_request - > osd_req )
rbd_osd_req_destroy ( obj_request - > osd_req ) ;
rbd_assert ( obj_request_type_valid ( obj_request - > type ) ) ;
switch ( obj_request - > type ) {
case OBJ_REQUEST_BIO :
if ( obj_request - > bio_list )
bio_chain_put ( obj_request - > bio_list ) ;
break ;
}
kfree ( obj_request ) ;
}
/*
* Caller is responsible for filling in the list of object requests
* that comprises the image request , and the Linux request pointer
* ( if there is one ) .
*/
struct rbd_img_request * rbd_img_request_create ( struct rbd_device * rbd_dev ,
u64 offset , u64 length ,
bool write_request )
{
struct rbd_img_request * img_request ;
struct ceph_snap_context * snapc = NULL ;
img_request = kmalloc ( sizeof ( * img_request ) , GFP_ATOMIC ) ;
if ( ! img_request )
return NULL ;
if ( write_request ) {
down_read ( & rbd_dev - > header_rwsem ) ;
snapc = ceph_get_snap_context ( rbd_dev - > header . snapc ) ;
up_read ( & rbd_dev - > header_rwsem ) ;
if ( WARN_ON ( ! snapc ) ) {
kfree ( img_request ) ;
return NULL ; /* Shouldn't happen */
}
}
img_request - > rq = NULL ;
img_request - > rbd_dev = rbd_dev ;
img_request - > offset = offset ;
img_request - > length = length ;
img_request - > write_request = write_request ;
if ( write_request )
img_request - > snapc = snapc ;
else
img_request - > snap_id = rbd_dev - > spec - > snap_id ;
spin_lock_init ( & img_request - > completion_lock ) ;
img_request - > next_completion = 0 ;
img_request - > callback = NULL ;
img_request - > obj_request_count = 0 ;
INIT_LIST_HEAD ( & img_request - > obj_requests ) ;
kref_init ( & img_request - > kref ) ;
rbd_img_request_get ( img_request ) ; /* Avoid a warning */
rbd_img_request_put ( img_request ) ; /* TEMPORARY */
return img_request ;
}
static void rbd_img_request_destroy ( struct kref * kref )
{
struct rbd_img_request * img_request ;
struct rbd_obj_request * obj_request ;
struct rbd_obj_request * next_obj_request ;
img_request = container_of ( kref , struct rbd_img_request , kref ) ;
for_each_obj_request_safe ( img_request , obj_request , next_obj_request )
rbd_img_obj_request_del ( img_request , obj_request ) ;
if ( img_request - > write_request )
ceph_put_snap_context ( img_request - > snapc ) ;
kfree ( img_request ) ;
}
static int rbd_img_request_fill_bio ( struct rbd_img_request * img_request ,
struct bio * bio_list )
{
struct rbd_device * rbd_dev = img_request - > rbd_dev ;
struct rbd_obj_request * obj_request = NULL ;
struct rbd_obj_request * next_obj_request ;
unsigned int bio_offset ;
u64 image_offset ;
u64 resid ;
u16 opcode ;
opcode = img_request - > write_request ? CEPH_OSD_OP_WRITE
: CEPH_OSD_OP_READ ;
bio_offset = 0 ;
image_offset = img_request - > offset ;
rbd_assert ( image_offset = = bio_list - > bi_sector < < SECTOR_SHIFT ) ;
resid = img_request - > length ;
while ( resid ) {
const char * object_name ;
unsigned int clone_size ;
struct ceph_osd_req_op * op ;
u64 offset ;
u64 length ;
object_name = rbd_segment_name ( rbd_dev , image_offset ) ;
if ( ! object_name )
goto out_unwind ;
offset = rbd_segment_offset ( rbd_dev , image_offset ) ;
length = rbd_segment_length ( rbd_dev , image_offset , resid ) ;
obj_request = rbd_obj_request_create ( object_name ,
offset , length ,
OBJ_REQUEST_BIO ) ;
kfree ( object_name ) ; /* object request has its own copy */
if ( ! obj_request )
goto out_unwind ;
rbd_assert ( length < = ( u64 ) UINT_MAX ) ;
clone_size = ( unsigned int ) length ;
obj_request - > bio_list = bio_chain_clone_range ( & bio_list ,
& bio_offset , clone_size ,
GFP_ATOMIC ) ;
if ( ! obj_request - > bio_list )
goto out_partial ;
/*
* Build up the op to use in building the osd
* request . Note that the contents of the op are
* copied by rbd_osd_req_create ( ) .
*/
op = rbd_osd_req_op_create ( opcode , offset , length ) ;
if ( ! op )
goto out_partial ;
obj_request - > osd_req = rbd_osd_req_create ( rbd_dev ,
img_request - > write_request ,
obj_request , op ) ;
rbd_osd_req_op_destroy ( op ) ;
if ( ! obj_request - > osd_req )
goto out_partial ;
/* status and version are initially zero-filled */
rbd_img_obj_request_add ( img_request , obj_request ) ;
image_offset + = length ;
resid - = length ;
}
return 0 ;
out_partial :
rbd_obj_request_put ( obj_request ) ;
out_unwind :
for_each_obj_request_safe ( img_request , obj_request , next_obj_request )
rbd_obj_request_put ( obj_request ) ;
return - ENOMEM ;
}
static void rbd_img_obj_callback ( struct rbd_obj_request * obj_request )
{
struct rbd_img_request * img_request ;
u32 which = obj_request - > which ;
bool more = true ;
img_request = obj_request - > img_request ;
rbd_assert ( img_request ! = NULL ) ;
rbd_assert ( img_request - > rq ! = NULL ) ;
rbd_assert ( which ! = BAD_WHICH ) ;
rbd_assert ( which < img_request - > obj_request_count ) ;
rbd_assert ( which > = img_request - > next_completion ) ;
spin_lock_irq ( & img_request - > completion_lock ) ;
if ( which ! = img_request - > next_completion )
goto out ;
for_each_obj_request_from ( img_request , obj_request ) {
unsigned int xferred ;
int result ;
rbd_assert ( more ) ;
rbd_assert ( which < img_request - > obj_request_count ) ;
if ( ! atomic_read ( & obj_request - > done ) )
break ;
rbd_assert ( obj_request - > xferred < = ( u64 ) UINT_MAX ) ;
xferred = ( unsigned int ) obj_request - > xferred ;
result = ( int ) obj_request - > result ;
if ( result )
rbd_warn ( NULL , " obj_request %s result %d xferred %u \n " ,
img_request - > write_request ? " write " : " read " ,
result , xferred ) ;
more = blk_end_request ( img_request - > rq , result , xferred ) ;
which + + ;
}
rbd_assert ( more ^ ( which = = img_request - > obj_request_count ) ) ;
img_request - > next_completion = which ;
out :
spin_unlock_irq ( & img_request - > completion_lock ) ;
if ( ! more )
rbd_img_request_complete ( img_request ) ;
}
static int rbd_img_request_submit ( struct rbd_img_request * img_request )
{
struct rbd_device * rbd_dev = img_request - > rbd_dev ;
struct ceph_osd_client * osdc = & rbd_dev - > rbd_client - > client - > osdc ;
struct rbd_obj_request * obj_request ;
for_each_obj_request ( img_request , obj_request ) {
int ret ;
obj_request - > callback = rbd_img_obj_callback ;
ret = rbd_obj_request_submit ( osdc , obj_request ) ;
if ( ret )
return ret ;
/*
* The image request has its own reference to each
* of its object requests , so we can safely drop the
* initial one here .
*/
rbd_obj_request_put ( obj_request ) ;
}
return 0 ;
}
static void rbd_request_fn ( struct request_queue * q )
{
struct rbd_device * rbd_dev = q - > queuedata ;
bool read_only = rbd_dev - > mapping . read_only ;
struct request * rq ;
int result ;
while ( ( rq = blk_fetch_request ( q ) ) ) {
bool write_request = rq_data_dir ( rq ) = = WRITE ;
struct rbd_img_request * img_request ;
u64 offset ;
u64 length ;
/* Ignore any non-FS requests that filter through. */
if ( rq - > cmd_type ! = REQ_TYPE_FS ) {
__blk_end_request_all ( rq , 0 ) ;
continue ;
}
spin_unlock_irq ( q - > queue_lock ) ;
/* Disallow writes to a read-only device */
if ( write_request ) {
result = - EROFS ;
if ( read_only )
goto end_request ;
rbd_assert ( rbd_dev - > spec - > snap_id = = CEPH_NOSNAP ) ;
}
/* Quit early if the snapshot has disappeared */
if ( ! atomic_read ( & rbd_dev - > exists ) ) {
dout ( " request for non-existent snapshot " ) ;
rbd_assert ( rbd_dev - > spec - > snap_id ! = CEPH_NOSNAP ) ;
result = - ENXIO ;
goto end_request ;
}
offset = ( u64 ) blk_rq_pos ( rq ) < < SECTOR_SHIFT ;
length = ( u64 ) blk_rq_bytes ( rq ) ;
result = - EINVAL ;
if ( WARN_ON ( offset & & length > U64_MAX - offset + 1 ) )
goto end_request ; /* Shouldn't happen */
result = - ENOMEM ;
img_request = rbd_img_request_create ( rbd_dev , offset , length ,
write_request ) ;
if ( ! img_request )
goto end_request ;
img_request - > rq = rq ;
result = rbd_img_request_fill_bio ( img_request , rq - > bio ) ;
if ( ! result )
result = rbd_img_request_submit ( img_request ) ;
if ( result )
rbd_img_request_put ( img_request ) ;
end_request :
spin_lock_irq ( q - > queue_lock ) ;
if ( result < 0 ) {
rbd_warn ( rbd_dev , " obj_request %s result %d \n " ,
write_request ? " write " : " read " , result ) ;
__blk_end_request_all ( rq , result ) ;
}
}
}
/*
* block device queue callback
*/
@ -1929,8 +2546,8 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
disk - > fops = & rbd_bd_ops ;
disk - > private_data = rbd_dev ;
/* init rq */
q = blk_init_queue ( rbd_rq_fn , & rbd_dev - > lock ) ;
( void ) rbd_rq_fn ; /* avoid a warning */
q = blk_init_queue ( rbd_re quest _fn , & rbd_dev - > lock ) ;
if ( ! q )
goto out_disk ;