📄 io-threads.c
字号:
int32_t op_ret, int32_t op_errno, struct stat *buf){ STACK_UNWIND (frame, op_ret, op_errno, buf); return 0;}static int32_t iot_stat_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc){ STACK_WIND (frame, iot_stat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc); return 0;}int32_t iot_stat (call_frame_t *frame, xlator_t *this, loc_t *loc){ call_stub_t *stub; iot_local_t *local = NULL; iot_worker_t *worker = NULL; iot_conf_t *conf; char fd_list_empty = 0; conf = this->private; local = calloc (1, sizeof (*local)); frame->local = local; LOCK (&(loc->inode->lock)); { if (list_empty (&(loc->inode->fds))) fd_list_empty = 1; } UNLOCK (&(loc->inode->lock)); if (fd_list_empty) { STACK_WIND(frame, iot_stat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat, loc); return 0; } worker = iot_schedule (conf, NULL, loc->inode->ino); stub = fop_stat_stub (frame, iot_stat_wrapper, loc); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}int32_t iot_fstat_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *buf){ STACK_UNWIND (frame, op_ret, op_errno, buf); return 0;}static int32_t iot_fstat_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd){ STACK_WIND (frame, iot_fstat_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat, fd); return 0;}int32_t iot_fstat (call_frame_t *frame, xlator_t *this, fd_t *fd){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_fstat_stub (frame, iot_fstat_wrapper, fd); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_fstat call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}int32_t iot_truncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *buf){ STACK_UNWIND (frame, op_ret, op_errno, buf); return 0;}static int32_t iot_truncate_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset){ STACK_WIND (frame, iot_truncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset); return 0;}int32_t iot_truncate (call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset){ call_stub_t *stub; iot_local_t *local = NULL; iot_worker_t *worker = NULL; iot_conf_t *conf; char fd_list_empty = 0; conf = this->private; local = calloc (1, sizeof (*local)); frame->local = local; LOCK (&loc->inode->lock); { if (list_empty (&loc->inode->fds)) fd_list_empty = 1; } UNLOCK (&loc->inode->lock); if (fd_list_empty) { STACK_WIND(frame, iot_truncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate, loc, offset); return 0; } worker = iot_schedule (conf, NULL, loc->inode->ino); stub = fop_truncate_stub (frame, iot_truncate_wrapper, loc, offset); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_stat call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}int32_t iot_ftruncate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *buf){ STACK_UNWIND (frame, op_ret, op_errno, buf); return 0;}static int32_t iot_ftruncate_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset){ STACK_WIND (frame, iot_ftruncate_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->ftruncate, fd, offset); return 0;}int32_t iot_ftruncate (call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_ftruncate_stub (frame, iot_ftruncate_wrapper, fd, offset); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_ftruncate call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}int32_t iot_utimens_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *buf){ STACK_UNWIND (frame, op_ret, op_errno, buf); return 0;}static int32_t iot_utimens_wrapper (call_frame_t *frame, xlator_t *this, loc_t *loc, struct timespec tv[2]){ STACK_WIND (frame, iot_utimens_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->utimens, loc, tv); return 0;}int32_t iot_utimens (call_frame_t *frame, xlator_t *this, loc_t *loc, struct timespec tv[2]){ call_stub_t *stub; iot_local_t *local = NULL; iot_worker_t *worker = NULL; iot_conf_t *conf; char fd_list_empty = 0; conf = this->private; local = calloc (1, sizeof (*local)); frame->local = local; LOCK (&(loc->inode->lock)); { if (list_empty (&(loc->inode->fds))) fd_list_empty = 1; } UNLOCK (&(loc->inode->lock)); if (fd_list_empty) { STACK_WIND(frame, iot_utimens_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->utimens, loc, tv); return 0; } worker = iot_schedule (conf, NULL, loc->inode->ino); stub = fop_utimens_stub (frame, iot_utimens_wrapper, loc, tv); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_utimens call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}static voidiot_queue (iot_worker_t *worker, call_stub_t *stub){ iot_queue_t *queue; iot_conf_t *conf = worker->conf; iot_local_t *local = stub->frame->local; size_t frame_size = local->frame_size; queue = calloc (1, sizeof (*queue)); queue->stub = stub; pthread_mutex_lock (&conf->lock); /* while (worker->queue_size >= worker->queue_limit) pthread_cond_wait (&worker->q_cond, &worker->lock); */ while (frame_size && (conf->current_size >= conf->cache_size)) pthread_cond_wait (&conf->q_cond, &conf->lock); queue->next = &worker->queue; queue->prev = worker->queue.prev; queue->next->prev = queue; queue->prev->next = queue; /* dq_cond */ worker->queue_size++; worker->q++; conf->current_size += local->frame_size; pthread_cond_broadcast (&worker->dq_cond); pthread_mutex_unlock (&conf->lock);}static call_stub_t *iot_dequeue (iot_worker_t *worker){ call_stub_t *stub = NULL; iot_queue_t *queue = NULL; iot_conf_t *conf = worker->conf; iot_local_t *local = NULL; pthread_mutex_lock (&conf->lock); while (!worker->queue_size) /* pthread_cond_wait (&worker->dq_cond, &worker->lock); */ pthread_cond_wait (&worker->dq_cond, &conf->lock); queue = worker->queue.next; queue->next->prev = queue->prev; queue->prev->next = queue->next; stub = queue->stub; local = stub->frame->local; worker->queue_size--; worker->dq++; /* q_cond */ conf->current_size -= local->frame_size; pthread_cond_broadcast (&conf->q_cond); pthread_mutex_unlock (&conf->lock); freee (queue); return stub;}static void *iot_worker (void *arg){ iot_worker_t *worker = arg; while (1) { call_stub_t *stub; stub = iot_dequeue (worker); call_resume (stub); }}#if 0static void *iot_reply (void *arg){ iot_worker_t *reply = arg; while (1) { call_stub_t *stub; stub = iot_dequeue (reply); freee (stub->frame->local); stub->frame->local = NULL; call_resume (stub); }}#endifstatic voidworkers_init (iot_conf_t *conf){ int i; conf->workers.next = &conf->workers; conf->workers.prev = &conf->workers; for (i=0; i<conf->thread_count; i++) { iot_worker_t *worker = calloc (1, sizeof (*worker)); worker->next = &conf->workers; worker->prev = conf->workers.prev; worker->next->prev = worker; worker->prev->next = worker; worker->queue.next = &worker->queue; worker->queue.prev = &worker->queue; /* pthread_mutex_init (&worker->lock, NULL); pthread_cond_init (&worker->q_cond, NULL); */ pthread_cond_init (&worker->dq_cond, NULL); /* worker->queue_limit = conf->queue_limit; */ worker->conf = conf; pthread_create (&worker->thread, NULL, iot_worker, worker); }}int32_t init (xlator_t *this){ iot_conf_t *conf; dict_t *options = this->options; if (!this->children || this->children->next) { gf_log ("io-threads", GF_LOG_ERROR, "FATAL: iot not configured with exactly one child"); return -1; } conf = (void *) calloc (1, sizeof (*conf)); conf->thread_count = 1; if (dict_get (options, "thread-count")) { conf->thread_count = data_to_int32 (dict_get (options, "thread-count")); gf_log ("io-threads", GF_LOG_DEBUG, "Using conf->thread_count = %d", conf->thread_count); } /* conf->queue_limit = 64; if (dict_get (options, "queue-limit")) { conf->queue_limit = data_to_int (dict_get (options, "queue-limit")); gf_log ("io-threads", GF_LOG_DEBUG, "Using conf->queue_limit = %d", conf->queue_limit); } */ conf->cache_size = 1048576 * 64; if (dict_get (options, "cache-size")) { conf->cache_size = gf_str_to_long_long (data_to_str (dict_get (options, "cache-size"))); gf_log ("io-threads", GF_LOG_DEBUG, "Using conf->cache_size = %lld", conf->cache_size); } pthread_mutex_init (&conf->lock, NULL); pthread_cond_init (&conf->q_cond, NULL); conf->files.next = &conf->files; conf->files.prev = &conf->files; pthread_mutex_init (&conf->files_lock, NULL); workers_init (conf); this->private = conf; return 0;}voidfini (xlator_t *this){ iot_conf_t *conf = this->private; freee (conf); this->private = NULL; return;}struct xlator_fops fops = { .open = iot_open, .create = iot_create, .readv = iot_readv, .writev = iot_writev, .flush = iot_flush, .fsync = iot_fsync, .lk = iot_lk, .stat = iot_stat, .fstat = iot_fstat, .truncate = iot_truncate, .ftruncate = iot_ftruncate, .utimens = iot_utimens, .close = iot_close,};struct xlator_mops mops = {};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -