⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mux.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * net/9p/mux.c * * Protocol Multiplexer * *  Copyright (C) 2004 by Eric Van Hensbergen <ericvh@gmail.com> *  Copyright (C) 2004-2005 by Latchesar Ionkov <lucho@ionkov.net> * *  This program is free software; you can redistribute it and/or modify *  it under the terms of the GNU General Public License version 2 *  as published by the Free Software Foundation. * *  This program is distributed in the hope that it will be useful, *  but WITHOUT ANY WARRANTY; without even the implied warranty of *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *  GNU General Public License for more details. * *  You should have received a copy of the GNU General Public License *  along with this program; if not, write to: *  Free Software Foundation *  51 Franklin Street, Fifth Floor *  Boston, MA  02111-1301  USA * */#include <linux/module.h>#include <linux/errno.h>#include <linux/fs.h>#include <linux/poll.h>#include <linux/kthread.h>#include <linux/idr.h>#include <linux/mutex.h>#include <net/9p/9p.h>#include <linux/parser.h>#include <net/9p/transport.h>#include <net/9p/conn.h>#define ERREQFLUSH	1#define SCHED_TIMEOUT	10#define MAXPOLLWADDR	2enum {	Rworksched = 1,		/* read work scheduled or running */	Rpending = 2,		/* can read */	Wworksched = 4,		/* write work scheduled or running */	Wpending = 8,		/* can write */};enum {	None,	Flushing,	Flushed,};struct p9_mux_poll_task;struct p9_req {	spinlock_t lock; /* protect request structure */	int tag;	struct p9_fcall *tcall;	struct p9_fcall *rcall;	int err;	p9_conn_req_callback cb;	void *cba;	int flush;	struct list_head req_list;};struct p9_conn {	spinlock_t lock; /* protect lock structure */	struct list_head mux_list;	struct p9_mux_poll_task *poll_task;	int msize;	unsigned char *extended;	struct p9_trans *trans;	struct p9_idpool *tagpool;	int err;	wait_queue_head_t equeue;	struct list_head req_list;	struct list_head unsent_req_list;	struct p9_fcall *rcall;	int rpos;	char *rbuf;	int wpos;	int wsize;	char *wbuf;	wait_queue_t poll_wait[MAXPOLLWADDR];	wait_queue_head_t *poll_waddr[MAXPOLLWADDR];	poll_table pt;	struct work_struct rq;	struct work_struct wq;	unsigned long wsched;};struct p9_mux_poll_task {	struct task_struct *task;	struct list_head mux_list;	int muxnum;};struct p9_mux_rpc {	struct p9_conn *m;	int err;	struct p9_fcall *tcall;	struct p9_fcall *rcall;	wait_queue_head_t wqueue;};static int p9_poll_proc(void *);static void p9_read_work(struct work_struct *work);static void p9_write_work(struct work_struct *work);static void p9_pollwait(struct file *filp, wait_queue_head_t *wait_address,			  poll_table * p);static u16 p9_mux_get_tag(struct p9_conn *);static void p9_mux_put_tag(struct p9_conn *, u16);static DEFINE_MUTEX(p9_mux_task_lock);static struct workqueue_struct *p9_mux_wq;static int p9_mux_num;static int p9_mux_poll_task_num;static struct p9_mux_poll_task p9_mux_poll_tasks[100];int p9_mux_global_init(void){	int i;	for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++)		p9_mux_poll_tasks[i].task = NULL;	p9_mux_wq = create_workqueue("v9fs");	if (!p9_mux_wq) {		printk(KERN_WARNING "v9fs: mux: creating workqueue failed\n");		return -ENOMEM;	}	return 0;}void p9_mux_global_exit(void){	destroy_workqueue(p9_mux_wq);}/** * p9_mux_calc_poll_procs - calculates the number of polling procs * based on the number of mounted v9fs filesystems. * * The current implementation returns sqrt of the number of mounts. */static int p9_mux_calc_poll_procs(int muxnum){	int n;	if (p9_mux_poll_task_num)		n = muxnum / p9_mux_poll_task_num +		    (muxnum % p9_mux_poll_task_num ? 1 : 0);	else		n = 1;	if (n > ARRAY_SIZE(p9_mux_poll_tasks))		n = ARRAY_SIZE(p9_mux_poll_tasks);	return n;}static int p9_mux_poll_start(struct p9_conn *m){	int i, n;	struct p9_mux_poll_task *vpt, *vptlast;	struct task_struct *pproc;	P9_DPRINTK(P9_DEBUG_MUX, "mux %p muxnum %d procnum %d\n", m, p9_mux_num,		p9_mux_poll_task_num);	mutex_lock(&p9_mux_task_lock);	n = p9_mux_calc_poll_procs(p9_mux_num + 1);	if (n > p9_mux_poll_task_num) {		for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++) {			if (p9_mux_poll_tasks[i].task == NULL) {				vpt = &p9_mux_poll_tasks[i];				P9_DPRINTK(P9_DEBUG_MUX, "create proc %p\n",									vpt);				pproc = kthread_create(p9_poll_proc, vpt,								"v9fs-poll");				if (!IS_ERR(pproc)) {					vpt->task = pproc;					INIT_LIST_HEAD(&vpt->mux_list);					vpt->muxnum = 0;					p9_mux_poll_task_num++;					wake_up_process(vpt->task);				}				break;			}		}		if (i >= ARRAY_SIZE(p9_mux_poll_tasks))			P9_DPRINTK(P9_DEBUG_ERROR,					"warning: no free poll slots\n");	}	n = (p9_mux_num + 1) / p9_mux_poll_task_num +	    ((p9_mux_num + 1) % p9_mux_poll_task_num ? 1 : 0);	vptlast = NULL;	for (i = 0; i < ARRAY_SIZE(p9_mux_poll_tasks); i++) {		vpt = &p9_mux_poll_tasks[i];		if (vpt->task != NULL) {			vptlast = vpt;			if (vpt->muxnum < n) {				P9_DPRINTK(P9_DEBUG_MUX, "put in proc %d\n", i);				list_add(&m->mux_list, &vpt->mux_list);				vpt->muxnum++;				m->poll_task = vpt;				memset(&m->poll_waddr, 0,							sizeof(m->poll_waddr));				init_poll_funcptr(&m->pt, p9_pollwait);				break;			}		}	}	if (i >= ARRAY_SIZE(p9_mux_poll_tasks)) {		if (vptlast == NULL) {			mutex_unlock(&p9_mux_task_lock);			return -ENOMEM;		}		P9_DPRINTK(P9_DEBUG_MUX, "put in proc %d\n", i);		list_add(&m->mux_list, &vptlast->mux_list);		vptlast->muxnum++;		m->poll_task = vptlast;		memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));		init_poll_funcptr(&m->pt, p9_pollwait);	}	p9_mux_num++;	mutex_unlock(&p9_mux_task_lock);	return 0;}static void p9_mux_poll_stop(struct p9_conn *m){	int i;	struct p9_mux_poll_task *vpt;	mutex_lock(&p9_mux_task_lock);	vpt = m->poll_task;	list_del(&m->mux_list);	for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {		if (m->poll_waddr[i] != NULL) {			remove_wait_queue(m->poll_waddr[i], &m->poll_wait[i]);			m->poll_waddr[i] = NULL;		}	}	vpt->muxnum--;	if (!vpt->muxnum) {		P9_DPRINTK(P9_DEBUG_MUX, "destroy proc %p\n", vpt);		kthread_stop(vpt->task);		vpt->task = NULL;		p9_mux_poll_task_num--;	}	p9_mux_num--;	mutex_unlock(&p9_mux_task_lock);}/** * p9_conn_create - allocate and initialize the per-session mux data * Creates the polling task if this is the first session. * * @trans - transport structure * @msize - maximum message size * @extended - pointer to the extended flag */struct p9_conn *p9_conn_create(struct p9_trans *trans, int msize,				    unsigned char *extended){	int i, n;	struct p9_conn *m, *mtmp;	P9_DPRINTK(P9_DEBUG_MUX, "transport %p msize %d\n", trans, msize);	m = kmalloc(sizeof(struct p9_conn), GFP_KERNEL);	if (!m)		return ERR_PTR(-ENOMEM);	spin_lock_init(&m->lock);	INIT_LIST_HEAD(&m->mux_list);	m->msize = msize;	m->extended = extended;	m->trans = trans;	m->tagpool = p9_idpool_create();	if (IS_ERR(m->tagpool)) {		mtmp = ERR_PTR(-ENOMEM);		kfree(m);		return mtmp;	}	m->err = 0;	init_waitqueue_head(&m->equeue);	INIT_LIST_HEAD(&m->req_list);	INIT_LIST_HEAD(&m->unsent_req_list);	m->rcall = NULL;	m->rpos = 0;	m->rbuf = NULL;	m->wpos = m->wsize = 0;	m->wbuf = NULL;	INIT_WORK(&m->rq, p9_read_work);	INIT_WORK(&m->wq, p9_write_work);	m->wsched = 0;	memset(&m->poll_waddr, 0, sizeof(m->poll_waddr));	m->poll_task = NULL;	n = p9_mux_poll_start(m);	if (n) {		kfree(m);		return ERR_PTR(n);	}	n = trans->poll(trans, &m->pt);	if (n & POLLIN) {		P9_DPRINTK(P9_DEBUG_MUX, "mux %p can read\n", m);		set_bit(Rpending, &m->wsched);	}	if (n & POLLOUT) {		P9_DPRINTK(P9_DEBUG_MUX, "mux %p can write\n", m);		set_bit(Wpending, &m->wsched);	}	for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++) {		if (IS_ERR(m->poll_waddr[i])) {			p9_mux_poll_stop(m);			mtmp = (void *)m->poll_waddr;	/* the error code */			kfree(m);			m = mtmp;			break;		}	}	return m;}EXPORT_SYMBOL(p9_conn_create);/** * p9_mux_destroy - cancels all pending requests and frees mux resources */void p9_conn_destroy(struct p9_conn *m){	P9_DPRINTK(P9_DEBUG_MUX, "mux %p prev %p next %p\n", m,		m->mux_list.prev, m->mux_list.next);	p9_conn_cancel(m, -ECONNRESET);	if (!list_empty(&m->req_list)) {		/* wait until all processes waiting on this session exit */		P9_DPRINTK(P9_DEBUG_MUX,			"mux %p waiting for empty request queue\n", m);		wait_event_timeout(m->equeue, (list_empty(&m->req_list)), 5000);		P9_DPRINTK(P9_DEBUG_MUX, "mux %p request queue empty: %d\n", m,			list_empty(&m->req_list));	}	p9_mux_poll_stop(m);	m->trans = NULL;	p9_idpool_destroy(m->tagpool);	kfree(m);}EXPORT_SYMBOL(p9_conn_destroy);/** * p9_pollwait - called by files poll operation to add v9fs-poll task * 	to files wait queue */static voidp9_pollwait(struct file *filp, wait_queue_head_t *wait_address,	      poll_table * p){	int i;	struct p9_conn *m;	m = container_of(p, struct p9_conn, pt);	for (i = 0; i < ARRAY_SIZE(m->poll_waddr); i++)		if (m->poll_waddr[i] == NULL)			break;	if (i >= ARRAY_SIZE(m->poll_waddr)) {		P9_DPRINTK(P9_DEBUG_ERROR, "not enough wait_address slots\n");		return;	}	m->poll_waddr[i] = wait_address;	if (!wait_address) {		P9_DPRINTK(P9_DEBUG_ERROR, "no wait_address\n");		m->poll_waddr[i] = ERR_PTR(-EIO);		return;	}	init_waitqueue_entry(&m->poll_wait[i], m->poll_task->task);	add_wait_queue(wait_address, &m->poll_wait[i]);}/** * p9_poll_mux - polls a mux and schedules read or write works if necessary */static void p9_poll_mux(struct p9_conn *m){	int n;	if (m->err < 0)		return;	n = m->trans->poll(m->trans, NULL);	if (n < 0 || n & (POLLERR | POLLHUP | POLLNVAL)) {		P9_DPRINTK(P9_DEBUG_MUX, "error mux %p err %d\n", m, n);		if (n >= 0)			n = -ECONNRESET;		p9_conn_cancel(m, n);	}	if (n & POLLIN) {		set_bit(Rpending, &m->wsched);		P9_DPRINTK(P9_DEBUG_MUX, "mux %p can read\n", m);		if (!test_and_set_bit(Rworksched, &m->wsched)) {			P9_DPRINTK(P9_DEBUG_MUX, "schedule read work %p\n", m);			queue_work(p9_mux_wq, &m->rq);		}	}	if (n & POLLOUT) {		set_bit(Wpending, &m->wsched);		P9_DPRINTK(P9_DEBUG_MUX, "mux %p can write\n", m);		if ((m->wsize || !list_empty(&m->unsent_req_list))		    && !test_and_set_bit(Wworksched, &m->wsched)) {			P9_DPRINTK(P9_DEBUG_MUX, "schedule write work %p\n", m);			queue_work(p9_mux_wq, &m->wq);		}	}}/** * p9_poll_proc - polls all v9fs transports for new events and queues * 	the appropriate work to the work queue */static int p9_poll_proc(void *a){	struct p9_conn *m, *mtmp;	struct p9_mux_poll_task *vpt;	vpt = a;	P9_DPRINTK(P9_DEBUG_MUX, "start %p %p\n", current, vpt);	while (!kthread_should_stop()) {		set_current_state(TASK_INTERRUPTIBLE);		list_for_each_entry_safe(m, mtmp, &vpt->mux_list, mux_list) {			p9_poll_mux(m);		}		P9_DPRINTK(P9_DEBUG_MUX, "sleeping...\n");		schedule_timeout(SCHED_TIMEOUT * HZ);	}	__set_current_state(TASK_RUNNING);	P9_DPRINTK(P9_DEBUG_MUX, "finish\n");	return 0;}/** * p9_write_work - called when a transport can send some data */static void p9_write_work(struct work_struct *work){	int n, err;	struct p9_conn *m;	struct p9_req *req;	m = container_of(work, struct p9_conn, wq);	if (m->err < 0) {		clear_bit(Wworksched, &m->wsched);		return;	}	if (!m->wsize) {		if (list_empty(&m->unsent_req_list)) {			clear_bit(Wworksched, &m->wsched);			return;		}		spin_lock(&m->lock);again:		req = list_entry(m->unsent_req_list.next, struct p9_req,			       req_list);		list_move_tail(&req->req_list, &m->req_list);		if (req->err == ERREQFLUSH)			goto again;		m->wbuf = req->tcall->sdata;		m->wsize = req->tcall->size;		m->wpos = 0;		spin_unlock(&m->lock);	}	P9_DPRINTK(P9_DEBUG_MUX, "mux %p pos %d size %d\n", m, m->wpos,								m->wsize);	clear_bit(Wpending, &m->wsched);	err = m->trans->write(m->trans, m->wbuf + m->wpos, m->wsize - m->wpos);	P9_DPRINTK(P9_DEBUG_MUX, "mux %p sent %d bytes\n", m, err);	if (err == -EAGAIN) {		clear_bit(Wworksched, &m->wsched);		return;	}	if (err < 0)		goto error;	else if (err == 0) {		err = -EREMOTEIO;		goto error;	}	m->wpos += err;	if (m->wpos == m->wsize)		m->wpos = m->wsize = 0;	if (m->wsize == 0 && !list_empty(&m->unsent_req_list)) {		if (test_and_clear_bit(Wpending, &m->wsched))			n = POLLOUT;		else			n = m->trans->poll(m->trans, NULL);		if (n & POLLOUT) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -