⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gnbd_trans.c

📁 openGFS , a kind of file system.
💻 C
字号:
/* * *    Copyright 1999 Regents of the University of Minnesota *    Portions Copyright 1999-2001 Sistina Software, Inc. *    Portions Copyright 2001 The OpenGFS Project * *    This is free software released under the GNU General Public License. *    There is no warranty for this software.  See the file COPYING for *    details. * *    See the file AUTHORS for a list of contributors. * *//* * This is where all of the actual data io between server and client happens. */#define __KERNEL_SYSCALLS__#include <linux/module.h>#include <linux/blkdev.h>#include <linux/blk.h>#include <linux/kernel.h>#include <linux/slab.h>#include <linux/fs.h>#include <linux/string.h>#include <linux/smp_lock.h>#include <linux/unistd.h>#include <linux/net.h>#include <linux/in.h>#include <linux/socket.h>#include <net/sock.h>#include <asm/uaccess.h>#include "trans.h"#include "gnbd_macros.h"#include "gnbd_kern.h"#include "gnbd.h"/* * prototype the data_xmit funtion here so I can leave it at the bottom of * this file. */typedef enum {	DXwrite = 0,	DXread,	DXpeek} DXtype;static int data_xmit(DXtype send, struct socket *sock, void *buf, u32 size);static char *DXstrs[] = { "write", "read", "peek" };/* * cmd_RW() */voidcmd_RW(gnbd_t * tio, struct request *req, u32 len, u64 offset){	transferQ_t *tsQ;	theader_t ioHdr;	int err = 0;	ASSERT(tio != NULL);	ASSERT(req != NULL);	ASSERT(tio->sock != NULL);	ASSERT(req->buffer != NULL);	if (!tio->recvd_run) {		req->errors = -ESHUTDOWN;		my_end_request(req);		goto exit_cmd_rw;	}	tsQ = kmalloc(sizeof(transferQ_t), GFP_KERNEL);	if (!tsQ) {		req->errors = -ENOMEM;		my_end_request(req);		goto exit_cmd_rw;	}	/*Setup reply space */	tsQ->hdr.type = (req->cmd == READ) ? tioReadReq : tioWriteReq;	tsQ->hdr.key = (unsigned long) tsQ;	tsQ->hdr.len = len;	tsQ->hdr.offset = offset;	tsQ->hdr.err = 0;	tsQ->req = req;	tsQ->next = NULL;	/*build header */	ioHdr.type = (req->cmd == READ) ? tioReadReq : tioWriteReq;	ioHdr.err = 0;	ioHdr.key = tsQ->hdr.key;	/* untouch by server; don't need byte order */	ioHdr.offset = cpu_to_be64(offset);	ioHdr.len = cpu_to_be32(len);	/* for a read this is the amount we want.					   NOT the amount following. */	printd("cmd_RW: cmd=%x buffer=0x%p key=0x%llx offset=0x%llx len=0x%x\n"			req->cmd, req->buffer, tsQ->hdr.key, offset, len);#if 0	/*Queue reply */	down(&tio->tqmux);	tsQ->next = tio->Rhead;	tio->Rhead = tsQ;	up(&tio->tqmux);#endif	/* FIXME: problem is here. durring a reconnect, requests can be queued	 * before the re-requests are made.  This allows a request to be sent	 * twice.  And thus received twice, which causes problems.	 * I wonder if I should just nest these.  Gonna have to make sure I can	 * do that, but it may be the best solution. (or it may not)	 */	/* Does a simular problem exist with the new reconnect? Have to find	 * out.	 * Have to get it working first....	 */	/*send buffer */	printd("About to send request data\n");	spin_unlock_irq(&io_request_lock);	down(&tio->sender);	err = data_xmit(DXwrite, tio->sock, &ioHdr, sizeof(theader_t));	if (req->cmd == WRITE)		err = data_xmit(DXwrite, tio->sock, req->buffer, len);	up(&tio->sender);	spin_lock_irq(&io_request_lock);	if (err == 0 || err == -ECONNRESET || err == -EPIPE)		/* shutdown transiod. */		trans_shutdown(tio, 0);	/* I wonder is putting this down here opens the possibility of this	 * getting recieved before I can put it onto the queue...	 */	/*Queue reply */	down(&tio->tqmux);	tsQ->next = tio->Rhead;	tio->Rhead = tsQ;	up(&tio->tqmux);      exit_cmd_rw:	return;}/* * empty_transferQ_list() * Clears out and pending requests by setting the err, ending the request, * and then freeing the memory. */voidempty_transferQ_list(gnbd_t * tio){	transferQ_t *tmp, *kill = NULL;	printd("Entering empty_transferQ_list\n");	if (tio == NULL)		return;	if (!tio->Rhead)		return;	printd("Proceding to empty transferQ\n");	down(&tio->tqmux);	printd("Got tqmux, going to town.\n");	tmp = tio->Rhead;	while (tmp != NULL) {		kill = tmp;		tmp = tmp->next;		if (kill->req == NULL) {			printe("kill(%p)->req was null\n", kill);			kfree(kill);			continue;		}		kill->req->errors = -ESHUTDOWN;		my_end_request(kill->req);		printd("freeing pointer 0x%p\n", kill);		kfree(kill);	}	tio->Rhead = NULL;	up(&tio->tqmux);}/* * transiod() * This thread handles the incomming data on the socket. * There will be a version of this thread running for each device opened. */inttransiod(void *data){	gnbd_t *tio = (gnbd_t *) data;	theader_t ioHdr;	int err;	/*  Set up the thread  */	lock_kernel();	daemonize();	sprintf(current->comm, "gnbd_transiod");	up(&tio->tiodmux);	printd("starting transiod loop\n");	/* now wait for data */	while (tio->recvd_run) {		printd("==top of transiod loop\n");		ASSERT(tio != NULL);		if (tio->sock == NULL)			break;		err = data_xmit(DXread, tio->sock, &ioHdr, sizeof(theader_t));		printd("Got header.\n");		if (err <= 0) {	/* if read 0, socket closed. */			printe("Error trying to read header (%d)\n", err);			tio->good_connect = 0;			goto shutdowntransiod;		}		if (!tio->recvd_run)			goto shutdowntransiod;		be32_to_cpus(&ioHdr.len);		/* Received a shutdown command from the server, reply with a shutdown		 * and stop this thread.		 */		if (ioHdr.type == tioShutdown) {			printd("Got a shutdown\n");			ioHdr.type = tioShutdown;			ioHdr.key = 0;			ioHdr.offset = 0;			ioHdr.len = 0;			/*wrap in sender mutex? */			data_xmit(DXwrite, tio->sock, &ioHdr,				  sizeof(theader_t));			tio->recvd_run = 0;			empty_transferQ_list(tio);			break;		} else			/* Received a DataReply,			 * Pull down the data, and stuff it into the receive buffer that was			 * set up in the tansferQ structure way back in trans_io().			 */		if (ioHdr.type == tioReadRpl || ioHdr.type == tioWriteRpl) {			transferQ_t *tmp;			printd("Got a DataReply\n");			/* find the matched receiver struct */			down(&tio->tqmux);			/*first find. */			for (tmp = tio->Rhead;			     tmp != NULL &&			     tmp->hdr.key != ioHdr.key; tmp = tmp->next) ;			if (tmp == NULL) {				printe				    ("ACK!! we couldn't match the incomming header with an "				     "existing request!\n" "   key    = 0x%llx\n"				     "   len    = 0x%x\n   offset = 0x%llx\n"				     "   err    = 0x%x\n" "   type   = 0x%x\n",				     ioHdr.key, ioHdr.len, ioHdr.offset,				     ioHdr.err, ioHdr.type);				printe("Expect to crash very soon now......\n");				/* I guess the best thing to do is to read in the extra crap,				 * and bit bucket it.  I assume that it will reread if needed.				 * Or perhaps somehow someone on the list durring the resend				 * got on there when it wasn't supposed to.....				 */				goto shutdowntransiod;			}			{	/* Then delete */				transferQ_t *temp, *last = NULL;				if (tio->Rhead == tmp) {					tio->Rhead = tio->Rhead->next;				} else {					for (temp = tio->Rhead; temp;					     last = temp, temp = temp->next) {						if (temp == tmp) {							last->next = temp->next;						}					}				}			}			up(&tio->tqmux);			printd("found request pointer, key is 0x%llx\n",			       tmp->hdr.key);			if (tio->sock == NULL)				break;			if (ioHdr.len > tmp->hdr.len) {				printe				    ("Ack! Server is sending %d bytes, but I was expecting "				     "only %d bytes.\n", ioHdr.len,				     tmp->hdr.len);				printe("Expect to crash very soon now......\n");			}			if (ioHdr.err != 0)				tmp->req->errors = ioHdr.err;			if ((tmp->hdr.type == tioReadReq) && (ioHdr.len != 0)) {				printd("Receiving data\n");				err = data_xmit(DXread, tio->sock, tmp->req->buffer,						min(ioHdr.len, tmp->hdr.len));				if (err < 0) {#if 0					tmp->req->errors = err;#endif					printe("Error reading Data(%d)\n", err);				}				/* restartable errors */				if (err == 0 || err == -ECONNRESET				    || err == -EPIPE) {					tio->good_connect = 0;					goto shutdowntransiod;				}			}			printd("done with request %p\n", tmp->req);			my_end_request(tmp->req);			kfree(tmp);	/* all done with this */		} else {	/* unrecognised header type. */			printe("Unrcognised header type of 0x%x\n", ioHdr.type);			if (ioHdr.len != 0) {				printe				    ("Oh gag! there's %d bytes of data following it as well "				     "Well, I'm ignoring it.\n", ioHdr.len);				printe("Expect to crash very soon now......\n");			}		}	}			/*while(tio->recvd_run) */      shutdowntransiod:#if 0				/* just because we made it here, does not mean we want				   to give it all up. */	empty_transferQ_list(tio);#endif	if (tio->sock != NULL)		sock_release(tio->sock);	tio->sock = NULL;	tio->recvd_run = 0;	up(&tio->tiodmux);	unlock_kernel();	return 0;}/* * resend_requests() * * For each pending request in the transferQ, steps through and re-issues * that request to the server. * This function is only intended to be called if we lost a server * connection and are reestablishing a new connection. */static voidresend_requests(gnbd_t * dev){	transferQ_t *tmp;	theader_t ioHdr;	int err;	int i = 0;	printe("=>Scan pending");	for (tmp = dev->Rhead; tmp; tmp = tmp->next) {		i++;		printk(".");		ASSERT(tmp != NULL);		ioHdr.type = tmp->hdr.type;		ioHdr.key = tmp->hdr.key;		ioHdr.err = 0;		ioHdr.len = cpu_to_be32(tmp->hdr.len);		ioHdr.offset = cpu_to_be64(tmp->hdr.offset);		ASSERT(dev->sock != NULL);		ASSERT(tmp->req->buffer != NULL);		printd("Re-requesting key 0x%llx\n", ioHdr.key);		down(&dev->sender);		err = data_xmit(DXwrite, dev->sock, &ioHdr, sizeof(theader_t));		if (err <= 0) {			printe("error resending request %d. ", i);			if (dev->sock == NULL)				printe				    ("It looks like the socket has been closed\n");			else				printe				    ("It looks like the connection snapped\n");		}		if (tmp->req->cmd == WRITE)			data_xmit(DXwrite, dev->sock, tmp->req->buffer,				  tmp->hdr.len);		up(&dev->sender);	}	printk("\n");}/* * trans_login() * * Holds the fancy stuff that we use to log into a server.  This runs * before the transiod thread is created. * * Right now all we do is get the info structure from the server. */static inttrans_login(gnbd_t * dev){	int err = 0;	tio_info_t tinfo;	/* wait for server info */	err = data_xmit(DXread, dev->sock, &tinfo, sizeof(tio_info_t));	if (err < 0)		goto exit_trans_login;	dev->size = be64_to_cpu(tinfo.devsize);	dev->readonly = tinfo.readonly;      exit_trans_login:	return err;}/* * trans_start_up() * starts up the trans_recvd thread after initializing the mutexes. * * This is also now the reconnect. This is simply just a call to * resend_requests if any requests are pending. */inttrans_start_up(gnbd_t * dev){	int err = 0;	struct sockaddr_in addr;	err = sock_create(AF_INET, SOCK_STREAM, 0, &dev->sock);	if (err) {		printe("Failed to create socket (%d)\n", err);		goto exit_trans_startup;	}	addr.sin_family = AF_INET;	addr.sin_addr.s_addr = dev->servip;	addr.sin_port = dev->servport;	err = dev->sock->ops->connect(dev->sock, (struct sockaddr *) &addr,				      sizeof(addr), 0);	if (err) {		sock_release(dev->sock);		dev->sock = NULL;		printe("Unable to connect to server (%d)\n", err);		goto exit_trans_startup;	}	/* setup some other things 	 * These need to be done once. when the device is inited back in	 * init_module.	 */	init_MUTEX_LOCKED(&dev->tiodmux);	if (trans_login(dev) < 0) {		sock_release(dev->sock);		dev->sock = NULL;		printe("Login to server failed. %d\n", err);		goto exit_trans_startup;	}	/* start up transiod thread */	if (!dev->recvd_run) {		dev->recvd_run = 1;		err = kernel_thread(transiod, dev, 0);		if (err < 0) {			sock_release(dev->sock);			dev->sock = NULL;			printe("Couldn't start transiod thread (%d)\n", err);			dev->recvd_run = 0;			goto exit_trans_startup;		}		/* wait for thread to get itself set up before continuing. */		down(&dev->tiodmux);		wake_up(&dev->gassed);	}	/* before or after the thread exists? */	if (ISgnbdPending(dev))		resend_requests(dev);	printd("resending requests\n");	gnbd_request(&blk_dev[gnbd_major].request_queue);      exit_trans_startup:	return err;}/* * trans_shutdown() * * In order to get the transiod out of it's sleep on the socket, we send a * Shutdown header.  This causes the server to reply with a shutdown * header, and then kill that thread.  We see the second Shutdown header, * and since we flipped the recvd_run flag off, we also exit. * Now we can close the socket and be on our merry way. */voidtrans_shutdown(gnbd_t * tio, int complete){	theader_t nop;	if (tio->recvd_run) {	/* don't do this if it already stopped */		tio->recvd_run = 0;		nop.type = tioShutdown;		nop.key = 0;		nop.len = 0;		nop.offset = 0;		down(&tio->sender);	/* make sure noone is half way through a send */		data_xmit(DXwrite, tio->sock, (char *) &nop, sizeof(theader_t));		up(&tio->sender);		down(&tio->tiodmux);	}	/* do this to make sure things are down. */	if (complete)		empty_transferQ_list(tio);	if (tio->sock != NULL)		sock_release(tio->sock);	tio->sock = NULL;}/* * data_xmit() *  * This function sends or recvs the specified amount of data.  It exists * mostly as an abstraction to keep my mind for losing it anymore than it * already has. * This was based on the ndb xmit function, but I added a peek feature * which I don't use anymore.  I've hacked on it in other ways though. *****************************************************************************/static intdata_xmit(DXtype send, struct socket *sock, void *buf, u32 size){	mm_segment_t oldfs;	int result = -EIO;	struct msghdr msg;	struct iovec iov;	unsigned long flags;	sigset_t oldset;	oldfs = get_fs();	set_fs(get_ds());	spin_lock_irqsave(&current->sigmask_lock, flags);	oldset = current->blocked;	sigfillset(&current->blocked);	recalc_sigpending(current);	spin_unlock_irqrestore(&current->sigmask_lock, flags);	do {		iov.iov_base = buf;		iov.iov_len = (u16) size;		msg.msg_name = NULL;		msg.msg_namelen = 0;		msg.msg_iov = &iov;		msg.msg_iovlen = 1;		msg.msg_control = NULL;		msg.msg_controllen = 0;		msg.msg_namelen = 0;		msg.msg_flags = 0;		/* lookout. len param in socks is int.  But we're doing a long.		 * could show problems here. (though would think the loop would		 * handle it.)		 */		if (send == DXwrite)			result = sock_sendmsg(sock, &msg, (u16) size);		else if (send == DXread)			result = sock_recvmsg(sock, &msg, (u16) size, 0);		else if (send == DXpeek)			result = sock_recvmsg(sock, &msg, (u16) size, MSG_PEEK);		if (result <= 0) {			printe("%s - sock=%p at buf=%p, size=%u returned %d.\n",			       DXstrs[send], sock, buf, size, result);			break;		}		size -= (u32) result;		buf += result;	} while (size > 0);	spin_lock_irqsave(&current->sigmask_lock, flags);	current->blocked = oldset;	recalc_sigpending(current);	spin_unlock_irqrestore(&current->sigmask_lock, flags);	set_fs(oldfs);	return result;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -