⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 io-threads.c

📁 分布式文件系统
💻 C
📖 第 1 页 / 共 2 页
字号:
              int32_t op_ret,              int32_t op_errno,              struct stat *buf){  STACK_UNWIND (frame, op_ret, op_errno, buf);  return 0;}static int32_t iot_stat_wrapper (call_frame_t *frame,                  xlator_t *this,                  loc_t *loc){  STACK_WIND (frame,              iot_stat_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->stat,              loc);  return 0;}int32_t iot_stat (call_frame_t *frame,          xlator_t *this,          loc_t *loc){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_worker_t *worker = NULL;  iot_conf_t *conf;  char fd_list_empty = 0;   conf = this->private;  local = calloc (1, sizeof (*local));  frame->local = local;  LOCK (&(loc->inode->lock));  {    if (list_empty (&(loc->inode->fds)))      fd_list_empty = 1;  }  UNLOCK (&(loc->inode->lock));  if (fd_list_empty) {    STACK_WIND(frame,               iot_stat_cbk,               FIRST_CHILD(this),               FIRST_CHILD(this)->fops->stat,               loc);    return 0;  }   worker = iot_schedule (conf, NULL, loc->inode->ino);  stub = fop_stat_stub (frame,                        iot_stat_wrapper,                        loc);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_t iot_fstat_cbk (call_frame_t *frame,               void *cookie,               xlator_t *this,               int32_t op_ret,               int32_t op_errno,               struct stat *buf){  STACK_UNWIND (frame, op_ret, op_errno, buf);  return 0;}static int32_t iot_fstat_wrapper (call_frame_t *frame,                   xlator_t *this,                   fd_t *fd){  STACK_WIND (frame,              iot_fstat_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->fstat,              fd);  return 0;}int32_t iot_fstat (call_frame_t *frame,           xlator_t *this,           fd_t *fd){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;  stub = fop_fstat_stub (frame,                         iot_fstat_wrapper,                         fd);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_t iot_truncate_cbk (call_frame_t *frame,                  void *cookie,                  xlator_t *this,                  int32_t op_ret,                  int32_t op_errno,                  struct stat *buf){  STACK_UNWIND (frame, op_ret, op_errno, buf);  return 0;}static int32_t iot_truncate_wrapper (call_frame_t *frame,                      xlator_t *this,                      loc_t *loc,                      off_t offset){  STACK_WIND (frame,              iot_truncate_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->truncate,              loc,              offset);  return 0;}int32_t iot_truncate (call_frame_t *frame,              xlator_t *this,              loc_t *loc,              off_t offset){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_worker_t *worker = NULL;  iot_conf_t *conf;  char fd_list_empty = 0;    conf = this->private;  local = calloc (1, sizeof (*local));  frame->local = local;  LOCK (&loc->inode->lock);  {    if (list_empty (&loc->inode->fds))      fd_list_empty = 1;  }  UNLOCK (&loc->inode->lock);  if (fd_list_empty) {    STACK_WIND(frame,               iot_truncate_cbk,               FIRST_CHILD(this),               FIRST_CHILD(this)->fops->truncate,               loc,               offset);    return 0;  }   worker = iot_schedule (conf, NULL, loc->inode->ino);  stub = fop_truncate_stub (frame,                            iot_truncate_wrapper,                            loc,                            offset);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_t iot_ftruncate_cbk (call_frame_t *frame,                   void *cookie,                   xlator_t *this,                   int32_t op_ret,                   int32_t op_errno,                   struct stat *buf){  STACK_UNWIND (frame, op_ret, op_errno, buf);  return 0;}static int32_t iot_ftruncate_wrapper (call_frame_t *frame,                       xlator_t *this,                       fd_t *fd,                       off_t offset){  STACK_WIND (frame,              iot_ftruncate_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->ftruncate,              fd,              offset);  return 0;}int32_t iot_ftruncate (call_frame_t *frame,               xlator_t *this,               fd_t *fd,               off_t offset){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_file_t *file = NULL;  iot_worker_t *worker = NULL;  if (!dict_get (fd->ctx, this->name)) {    gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD");    STACK_UNWIND (frame, -1, EBADFD);    return 0;  }  file = data_to_ptr (dict_get (fd->ctx, this->name));  worker = file->worker;  local = calloc (1, sizeof (*local));  frame->local = local;  stub = fop_ftruncate_stub (frame,                             iot_ftruncate_wrapper,                             fd,                             offset);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }  iot_queue (worker, stub);  return 0;}int32_t iot_utimens_cbk (call_frame_t *frame,                 void *cookie,                 xlator_t *this,                 int32_t op_ret,                 int32_t op_errno,                 struct stat *buf){  STACK_UNWIND (frame, op_ret, op_errno, buf);  return 0;}static int32_t iot_utimens_wrapper (call_frame_t *frame,                     xlator_t *this,                     loc_t *loc,                     struct timespec tv[2]){  STACK_WIND (frame,              iot_utimens_cbk,              FIRST_CHILD(this),              FIRST_CHILD(this)->fops->utimens,              loc,              tv);    return 0;}int32_t iot_utimens (call_frame_t *frame,             xlator_t *this,             loc_t *loc,             struct timespec tv[2]){  call_stub_t *stub;  iot_local_t *local = NULL;  iot_worker_t *worker = NULL;  iot_conf_t *conf;  char fd_list_empty = 0;    conf = this->private;  local = calloc (1, sizeof (*local));  frame->local = local;  LOCK (&(loc->inode->lock));  {    if (list_empty (&(loc->inode->fds)))	fd_list_empty = 1;  }  UNLOCK (&(loc->inode->lock));  if (fd_list_empty) {    STACK_WIND(frame,               iot_utimens_cbk,               FIRST_CHILD(this),               FIRST_CHILD(this)->fops->utimens,               loc,               tv);    return 0;  }   worker = iot_schedule (conf, NULL, loc->inode->ino);  stub = fop_utimens_stub (frame,			   iot_utimens_wrapper,			   loc,			   tv);  if (!stub) {    gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub");    STACK_UNWIND (frame, -1, ENOMEM, NULL);    return 0;  }  iot_queue (worker, stub);  return 0;}static voidiot_queue (iot_worker_t *worker,           call_stub_t *stub){  iot_queue_t *queue;  iot_conf_t *conf = worker->conf;  iot_local_t *local = stub->frame->local;  size_t frame_size = local->frame_size;  queue = calloc (1, sizeof (*queue));  queue->stub = stub;  pthread_mutex_lock (&conf->lock);  /*    while (worker->queue_size >= worker->queue_limit)      pthread_cond_wait (&worker->q_cond, &worker->lock);  */  while (frame_size && (conf->current_size >= conf->cache_size))    pthread_cond_wait (&conf->q_cond, &conf->lock);  queue->next = &worker->queue;  queue->prev = worker->queue.prev;  queue->next->prev = queue;  queue->prev->next = queue;  /* dq_cond */  worker->queue_size++;  worker->q++;  conf->current_size += local->frame_size;  pthread_cond_broadcast (&worker->dq_cond);  pthread_mutex_unlock (&conf->lock);}static call_stub_t *iot_dequeue (iot_worker_t *worker){  call_stub_t *stub = NULL;  iot_queue_t *queue = NULL;  iot_conf_t *conf = worker->conf;  iot_local_t *local = NULL;  pthread_mutex_lock (&conf->lock);  while (!worker->queue_size)    /*      pthread_cond_wait (&worker->dq_cond, &worker->lock);    */    pthread_cond_wait (&worker->dq_cond, &conf->lock);  queue = worker->queue.next;  queue->next->prev = queue->prev;  queue->prev->next = queue->next;  stub = queue->stub;  local = stub->frame->local;  worker->queue_size--;  worker->dq++;  /* q_cond */  conf->current_size -= local->frame_size;  pthread_cond_broadcast (&conf->q_cond);  pthread_mutex_unlock (&conf->lock);  freee (queue);  return stub;}static void *iot_worker (void *arg){  iot_worker_t *worker = arg;  while (1) {    call_stub_t *stub;    stub = iot_dequeue (worker);    call_resume (stub);  }}#if 0static void *iot_reply (void *arg){  iot_worker_t *reply = arg;  while (1) {    call_stub_t *stub;    stub = iot_dequeue (reply);    freee (stub->frame->local);    stub->frame->local = NULL;    call_resume (stub);  }}#endifstatic voidworkers_init (iot_conf_t *conf){  int i;  conf->workers.next = &conf->workers;  conf->workers.prev = &conf->workers;  for (i=0; i<conf->thread_count; i++) {    iot_worker_t *worker = calloc (1, sizeof (*worker));    worker->next = &conf->workers;    worker->prev = conf->workers.prev;    worker->next->prev = worker;    worker->prev->next = worker;    worker->queue.next = &worker->queue;    worker->queue.prev = &worker->queue;    /*      pthread_mutex_init (&worker->lock, NULL);      pthread_cond_init (&worker->q_cond, NULL);    */    pthread_cond_init (&worker->dq_cond, NULL);    /*      worker->queue_limit = conf->queue_limit;    */    worker->conf = conf;    pthread_create (&worker->thread, NULL, iot_worker, worker);  }}int32_t init (xlator_t *this){  iot_conf_t *conf;  dict_t *options = this->options;  if (!this->children || this->children->next) {    gf_log ("io-threads",      GF_LOG_ERROR,      "FATAL: iot not configured with exactly one child");    return -1;  }  conf = (void *) calloc (1, sizeof (*conf));  conf->thread_count = 1;  if (dict_get (options, "thread-count")) {    conf->thread_count = data_to_int32 (dict_get (options,            "thread-count"));    gf_log ("io-threads",      GF_LOG_DEBUG,      "Using conf->thread_count = %d",      conf->thread_count);  }  /*  conf->queue_limit = 64;  if (dict_get (options, "queue-limit")) {    conf->queue_limit = data_to_int (dict_get (options,                 "queue-limit"));    gf_log ("io-threads",      GF_LOG_DEBUG,      "Using conf->queue_limit = %d",      conf->queue_limit);  }  */  conf->cache_size = 1048576 * 64;  if (dict_get (options, "cache-size")) {    conf->cache_size = gf_str_to_long_long (data_to_str (dict_get (options,                "cache-size")));    gf_log ("io-threads",      GF_LOG_DEBUG,      "Using conf->cache_size = %lld",      conf->cache_size);  }  pthread_mutex_init (&conf->lock, NULL);  pthread_cond_init (&conf->q_cond, NULL);  conf->files.next = &conf->files;  conf->files.prev = &conf->files;  pthread_mutex_init (&conf->files_lock, NULL);  workers_init (conf);  this->private = conf;  return 0;}voidfini (xlator_t *this){  iot_conf_t *conf = this->private;  freee (conf);  this->private = NULL;  return;}struct xlator_fops fops = {  .open        = iot_open,  .create      = iot_create,  .readv       = iot_readv,  .writev      = iot_writev,  .flush       = iot_flush,  .fsync       = iot_fsync,  .lk          = iot_lk,  .stat        = iot_stat,  .fstat       = iot_fstat,  .truncate    = iot_truncate,  .ftruncate   = iot_ftruncate,  .utimens     = iot_utimens,  .close       = iot_close,};struct xlator_mops mops = {};

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -