Qemu rbd 미 러 소스 분석 열기

10608 단어
Ceph 0.94.1
Qemu 2.4.0
rbd.c
qemu 에서rbd_open 함수 에서 중요 한 데이터 구 조 를 작 동 했 습 니 다. 구조 체 BDRVRBD State 는 매우 중요 한 정 보 를 저장 합 니 다.
typedef struct BDRVRBDState {
    rados_t cluster;        //cluster handle
    rados_ioctx_t io_ctx;    //cluster IO   
    rbd_image_t image;    //rbd      
    char name[RBD_MAX_IMAGE_NAME_SIZE];  //rbd     
    char *snap;    //rbd       
} BDRVRBDState;

대 qemurbd_open 함수 분석:
static int qemu_rbd_open(BlockDriverState *bs, QDict *options, int flags,
                         Error **errp)
{
    BDRVRBDState *s = bs->opaque;
    char pool[RBD_MAX_POOL_NAME_SIZE];
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
    char conf[RBD_MAX_CONF_SIZE];
    char clientname_buf[RBD_MAX_CONF_SIZE];
    char *clientname;
    QemuOpts *opts;
    Error *local_err = NULL;
    const char *filename;
    int r;
 /*         */
    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
    qemu_opts_absorb_qdict(opts, options, &local_err);
    if (local_err) {
        error_propagate(errp, local_err);
        qemu_opts_del(opts);
        return -EINVAL;
    }
    filename = qemu_opt_get(opts, "filename");
	
 /*  filename,      */
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
                           snap_buf, sizeof(snap_buf),
                           s->name, sizeof(s->name),
                           conf, sizeof(conf), errp) < 0) {
        r = -EINVAL;
        goto failed_opts;
    }
 /*
 pool:pool   
 snap_buf:       
 s->name:rbd     
 conf:     
 */
	
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
 /*client_buf:client   */
    r = rados_create(&s->cluster, clientname);	 //  cluster handle
    if (r < 0) {
        error_setg(errp, "error initializing");
        goto failed_opts;
    }
	
 /*  snap_buf     s->snap*/
    s->snap = NULL;
    if (snap_buf[0] != '\0') {
        s->snap = g_strdup(snap_buf);
    }
	
 /*      ,               */
    if (strstr(conf, "conf=") == NULL) {
        /* try default location, but ignore failure */
        rados_conf_read_file(s->cluster, NULL);
    } else if (conf[0] != '\0') {
        r = qemu_rbd_set_conf(s->cluster, conf, true, errp);
        if (r < 0) {
            goto failed_shutdown;
        }
    }
 /*           */
    if (conf[0] != '\0') {
        r = qemu_rbd_set_conf(s->cluster, conf, false, errp);
        if (r < 0) {
            goto failed_shutdown;
        }
    }
    /*
     * Fallback to more conservative semantics if setting cache
     * options fails. Ignore errors from setting rbd_cache because the
     * only possible error is that the option does not exist, and
     * librbd defaults to no caching. If write through caching cannot
     * be set up, fall back to no caching.
     */
 /*  cache   */ 
    if (flags & BDRV_O_NOCACHE) {
        rados_conf_set(s->cluster, "rbd_cache", "false");
    } else {
        rados_conf_set(s->cluster, "rbd_cache", "true");
    }
    r = rados_connect(s->cluster);	 //  cluster
    if (r < 0) {
        error_setg(errp, "error connecting");
        goto failed_shutdown;
    }
    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);	//  IO   
    if (r < 0) {
        error_setg(errp, "error opening pool %s", pool);
        goto failed_shutdown;
    }
    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);	 //    IO       ,  rbd  
    if (r < 0) {
        error_setg(errp, "error reading header from %s", s->name);
        goto failed_open;
    }
    bs->read_only = (s->snap != NULL);
    qemu_opts_del(opts);
    return 0;
failed_open:
    rados_ioctx_destroy(s->io_ctx);
failed_shutdown:
    rados_shutdown(s->cluster);
    g_free(s->snap);
failed_opts:
    qemu_opts_del(opts);
    return r;
}

보 이 는 바 와 같이 qemu 를 완성 하고 있 습 니 다.rbd_open 함수 이후 rbd 미 러 에 대한 정 보 는 bs - > opaque 에 저장 되 어 다른 함수 에 의 해 이 용 될 수 있 습 니 다.
qemu_rbd_close 함 수 는 매우 간단 해서 여기에 붙 이지 않 습 니 다.
qemu rbd 에 대한 읽 기와 쓰기 작업 은 모두 비동기 적 인 방식 으로 진행 되 는 것 같 습 니 다.읽 기, 쓰기, flush 함수 에 대한 분석 은 다음 과 같다.
/*     */
static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
                                      int64_t sector_num,
                                      QEMUIOVector *qiov,
                                      int nb_sectors,
                                      BlockCompletionFunc *cb,
                                      void *opaque)
{
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                         RBD_AIO_READ);
}
/*     */
static BlockAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
                                       int64_t sector_num,
                                       QEMUIOVector *qiov,
                                       int nb_sectors,
                                       BlockCompletionFunc *cb,
                                       void *opaque)
{
    return rbd_start_aio(bs, sector_num, qiov, nb_sectors, cb, opaque,
                         RBD_AIO_WRITE);
}
/*flush    */
#ifdef LIBRBD_SUPPORTS_AIO_FLUSH
static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
                                      BlockCompletionFunc *cb,
                                      void *opaque)
{
    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
}
#else
static int qemu_rbd_co_flush(BlockDriverState *bs)
{
#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
    /* rbd_flush added in 0.1.1 */
    BDRVRBDState *s = bs->opaque;
    return rbd_flush(s->image);
#else
    return 0;
#endif
}
#endif
#ifdef LIBRBD_SUPPORTS_DISCARD
static BlockAIOCB* qemu_rbd_aio_discard(BlockDriverState *bs,
                                        int64_t sector_num,
                                        int nb_sectors,
                                        BlockCompletionFunc *cb,
                                        void *opaque)
{
    return rbd_start_aio(bs, sector_num, NULL, nb_sectors, cb, opaque,
                         RBD_AIO_DISCARD);
}
#endif

이 를 통 해 알 수 있 듯 이 읽 기와 쓰기, flush 는 모두 rbd 를 통 해start_ao 이 함수 로 완성 합 니 다.물론 librbd 가 비동기 flush 를 지원 하 는 상황 입 니 다.그럼 rbdstart_아이 오 는 또 어떤 가요?
/*    IO     */
/*       ,sector_num, qiov, nb_sectors, cb     qemu         */
static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
                                 int64_t sector_num,
                                 QEMUIOVector *qiov,
                                 int nb_sectors,
                                 BlockCompletionFunc *cb,
                                 void *opaque,
                                 RBDAIOCmd cmd)
{
    RBDAIOCB *acb;
    RADOSCB *rcb = NULL;
    rbd_completion_t c;
    int64_t off, size;
    char *buf;
    int r;
    BDRVRBDState *s = bs->opaque;
    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
    acb->cmd = cmd;
    acb->qiov = qiov;
    if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
        acb->bounce = NULL;
    } else {
        acb->bounce = qemu_try_blockalign(bs, qiov->size);
        if (acb->bounce == NULL) {
            goto failed;
        }
    }
    acb->ret = 0;
    acb->error = 0;
    acb->s = s;
    acb->bh = NULL;
    if (cmd == RBD_AIO_WRITE) {
        qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
    }
    buf = acb->bounce;
    off = sector_num * BDRV_SECTOR_SIZE;
    size = nb_sectors * BDRV_SECTOR_SIZE;
    rcb = g_new(RADOSCB, 1);
    rcb->acb = acb;
    rcb->buf = buf;
    rcb->s = acb->s;
    rcb->size = size;
    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
    if (r < 0) {
        goto failed;
    }
 /*acb rcb       IO       ,     IO   ,           ,    ,      。           IO   */
	
 /*  command   ,          */
    switch (cmd) {
    case RBD_AIO_WRITE:
        r = rbd_aio_write(s->image, off, size, buf, c);
        break;
    case RBD_AIO_READ:
        r = rbd_aio_read(s->image, off, size, buf, c);
        break;
    case RBD_AIO_DISCARD:
        r = rbd_aio_discard_wrapper(s->image, off, size, c);
        break;
    case RBD_AIO_FLUSH:
        r = rbd_aio_flush_wrapper(s->image, c);
        break;
    default:
        r = -EINVAL;
    }
    if (r < 0) {
        goto failed_completion;
    }
    return &acb->common;
failed_completion:
    rbd_aio_release(c);
failed:
    g_free(rcb);
    qemu_vfree(acb->bounce);
    qemu_aio_unref(acb);
    return NULL;
}

rbd. c 에 대한 분석 에서 우 리 는 읽 든 쓰 든 마지막 에 image 에 대한 조작 을 볼 수 있다.
그럼 rbdopen 이 함 수 는 어떻게 IO 컨 텍스트 와 snap 을 이용 하여 rbd 미 러 를 엽 니까?
Ceph 문서 에 서 는 C / C + 버 전의 librbd, 차 평 을 어떻게 사용 하 는 지 소개 하지 않 았 습 니 다.qemu 와 Ceph 의 소스 코드 를 스스로 대조 하여 분석 할 수 밖 에 없다.
Ceph 의 소스 코드 버 전: 0.94.1
src / include / rbd / librbd. h 에서 librbd 의 C 언어 인 터 페 이 스 를 설명 하 였 습 니 다.
그 중 rbdopen 의 정 의 는 다음 과 같 습 니 다.
CEPH_RBD_API int rbd_open(rados_ioctx_t io, const char *name, rbd_image_t *image, const char *snap_name);
rbd_close 의 정 의 는 다음 과 같다.
CEPH_RBD_API int rbd_close(rbd_image_t image);
우리 의 목 표 는 하나의 image 를 두 개의 pool, 하나의 ssd pool 과 하나의 hdd pool 에 연결 하 는 것 이다.원본 코드 를 변경 하 는 과정 에서 우 리 는 가능 한 한 원본 코드 를 적 게 바 꿉 니 다.현재 나의 생각 은:
현재 기본 pool 을 설정 하여 이 pool 에 이미 지 를 표시 한 다음 실행 하 는 과정 에서 image 에 대응 하 는 pool 이 바 뀔 수 있 습 니 다. 그러면 image 를 읽 고 쓰 는 코드 를 바 꿀 필요 가 없습니다.
librbd. h 에 대응 하 는 함수 성명 은 src / librbd / librbd. cc 에서 정 의 됩 니 다.
extern "C" int rbd_open(rados_ioctx_t p, const char *name, rbd_image_t *image,
 const char *snap_name) 
{
  librados::IoCtx io_ctx;
  librados::IoCtx::from_rados_ioctx_t(p, io_ctx);
  librbd::ImageCtx *ictx = new librbd::ImageCtx(name, "", snap_name, io_ctx,
 false);     //       
  tracepoint(librbd, open_image_enter, ictx, ictx->name.c_str(), ictx->id.c_str(), ictx->snap_name.c_str(), ictx->read_only);    //     ?
  int r = librbd::open_image(ictx);
    //  rbd   
  if (r >= 0)
    *image = (rbd_image_t)ictx;
            image,           
  tracepoint(librbd, open_image_exit, r);
  return r;
}
extern "C" int rbd_close(rbd_image_t image)
{
  librbd::ImageCtx *ctx = (librbd::ImageCtx *)image;
  tracepoint(librbd, close_image_enter, ctx, ctx->name.c_str(), ctx->id.c_str());
  librbd::close_image(ctx);
  tracepoint(librbd, close_image_exit);
  return 0;
}

좋은 웹페이지 즐겨찾기