📄 io-threads.c
字号:
/* Copyright (c) 2006, 2007, 2008 Z RESEARCH, Inc. <http://www.zresearch.com> This file is part of GlusterFS. GlusterFS is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. GlusterFS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.*/#ifndef _CONFIG_H#define _CONFIG_H#include "config.h"#endif#include "call-stub.h"#include "glusterfs.h"#include "logging.h"#include "dict.h"#include "xlator.h"#include "io-threads.h"static voidiot_queue (iot_worker_t *worker, call_stub_t *stub);static call_stub_t *iot_dequeue (iot_worker_t *worker);static iot_worker_t * iot_schedule (iot_conf_t *conf, iot_file_t *file, ino_t ino){ int32_t cnt = (ino % conf->thread_count); iot_worker_t *trav = conf->workers.next; for (; cnt; cnt--) trav = trav->next; if (file) file->worker = trav; trav->fd_count++; return trav;}int32_tiot_open_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd){ iot_conf_t *conf = this->private; if (op_ret >= 0) { iot_file_t *file = calloc (1, sizeof (*file)); iot_schedule (conf, file, fd->inode->ino); file->fd = fd; dict_set (fd->ctx, this->name, data_from_static_ptr (file)); pthread_mutex_lock (&conf->files_lock); file->next = &conf->files; file->prev = file->next->prev; file->next->prev = file; file->prev->next = file; pthread_mutex_unlock (&conf->files_lock); } STACK_UNWIND (frame, op_ret, op_errno, fd); return 0;}int32_tiot_open (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, fd_t *fd){ STACK_WIND (frame, iot_open_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->open, loc, flags, fd); return 0;}int32_tiot_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode, struct stat *stbuf){ iot_conf_t *conf = this->private; if (op_ret >= 0) { iot_file_t *file = calloc (1, sizeof (*file)); iot_schedule (conf, file, fd->inode->ino); file->fd = fd; dict_set (fd->ctx, this->name, data_from_static_ptr (file)); pthread_mutex_lock (&conf->files_lock); file->next = &conf->files; file->prev = file->next->prev; file->next->prev = file; file->prev->next = file; pthread_mutex_unlock (&conf->files_lock); } STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf); return 0;}int32_tiot_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, mode_t mode, fd_t *fd){ STACK_WIND (frame, iot_create_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create, loc, flags, mode, fd); return 0;}int32_tiot_close_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ iot_conf_t *conf = this->private; iot_local_t *local = frame->local; iot_file_t *file = local->file; pthread_mutex_lock (&conf->files_lock); { file->prev->next = file->next; file->next->prev = file->prev; } pthread_mutex_unlock (&conf->files_lock); file->worker->fd_count--; file->worker = NULL; freee (file); STACK_UNWIND (frame, op_ret, op_errno); return 0;}static int32_tiot_close_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd){ STACK_WIND (frame, iot_close_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->close, fd); return 0;}int32_tiot_close (call_frame_t *frame, xlator_t *this, fd_t *fd){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); local->file = file; frame->local = local; stub = fop_close_stub (frame, iot_close_wrapper, fd); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get close call stub"); STACK_UNWIND (frame, -1, ENOMEM); return 0; } iot_queue (worker, stub); return 0;}int32_tiot_readv_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iovec *vector, int32_t count, struct stat *stbuf){ iot_local_t *local = frame->local; local->frame_size = 0; //iov_length (vector, count); STACK_UNWIND (frame, op_ret, op_errno, vector, count, stbuf); return 0;}static int32_tiot_readv_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset){ STACK_WIND (frame, iot_readv_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv, fd, size, offset); return 0;}int32_tiot_readv (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size, off_t offset){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_readv_stub (frame, iot_readv_wrapper, fd, size, offset); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get readv call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL, 0); return 0; } iot_queue (worker, stub); return 0;}int32_tiot_flush_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ STACK_UNWIND (frame, op_ret, op_errno); return 0;}static int32_tiot_flush_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd){ STACK_WIND (frame, iot_flush_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->flush, fd); return 0;}int32_tiot_flush (call_frame_t *frame, xlator_t *this, fd_t *fd){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_flush_stub (frame, iot_flush_wrapper, fd); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get flush_cbk call stub"); STACK_UNWIND (frame, -1, ENOMEM); return 0; } iot_queue (worker, stub); return 0;}int32_tiot_fsync_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno){ STACK_UNWIND (frame, op_ret, op_errno); return 0;}static int32_tiot_fsync_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync){ STACK_WIND (frame, iot_fsync_cbk, FIRST_CHILD (this), FIRST_CHILD (this)->fops->fsync, fd, datasync); return 0;}int32_tiot_fsync (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t datasync){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_fsync_stub (frame, iot_fsync_wrapper, fd, datasync); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fsync_cbk call stub"); STACK_UNWIND (frame, -1, ENOMEM); return 0; } iot_queue (worker, stub); return 0;}int32_tiot_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct stat *stbuf){ iot_local_t *local = frame->local; local->frame_size = 0; /* hehe, caught me! */ STACK_UNWIND (frame, op_ret, op_errno, stbuf); return 0;}static int32_tiot_writev_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset){ STACK_WIND (frame, iot_writev_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev, fd, vector, count, offset); return 0;}int32_tiot_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, struct iovec *vector, int32_t count, off_t offset){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); if (frame->root->req_refs) local->frame_size = dict_serialized_length (frame->root->req_refs); else local->frame_size = iov_length (vector, count); frame->local = local; stub = fop_writev_stub (frame, iot_writev_wrapper, fd, vector, count, offset); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get writev call stub"); STACK_UNWIND (frame, -1, ENOMEM); return 0; } iot_queue (worker, stub); return 0;}int32_tiot_lk_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct flock *flock){ STACK_UNWIND (frame, op_ret, op_errno, flock); return 0;}static int32_tiot_lk_wrapper (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd, struct flock *flock){ STACK_WIND (frame, iot_lk_cbk, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lk, fd, cmd, flock); return 0;}int32_tiot_lk (call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t cmd, struct flock *flock){ call_stub_t *stub; iot_local_t *local = NULL; iot_file_t *file = NULL; iot_worker_t *worker = NULL; if (!dict_get (fd->ctx, this->name)) { gf_log (this->name, GF_LOG_ERROR, "fd context is NULL, returning EBADFD"); STACK_UNWIND (frame, -1, EBADFD); return 0; } file = data_to_ptr (dict_get (fd->ctx, this->name)); worker = file->worker; local = calloc (1, sizeof (*local)); frame->local = local; stub = fop_lk_stub (frame, iot_lk_wrapper, fd, cmd, flock); if (!stub) { gf_log (this->name, GF_LOG_ERROR, "cannot get fop_lk call stub"); STACK_UNWIND (frame, -1, ENOMEM, NULL); return 0; } iot_queue (worker, stub); return 0;}int32_t iot_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -