📄 gnbd_trans.c
字号:
/* * * Copyright 1999 Regents of the University of Minnesota * Portions Copyright 1999-2001 Sistina Software, Inc. * Portions Copyright 2001 The OpenGFS Project * * This is free software released under the GNU General Public License. * There is no warranty for this software. See the file COPYING for * details. * * See the file AUTHORS for a list of contributors. * *//* * This is where all of the actual data io between server and client happens. */#define __KERNEL_SYSCALLS__#include <linux/module.h>#include <linux/blkdev.h>#include <linux/blk.h>#include <linux/kernel.h>#include <linux/slab.h>#include <linux/fs.h>#include <linux/string.h>#include <linux/smp_lock.h>#include <linux/unistd.h>#include <linux/net.h>#include <linux/in.h>#include <linux/socket.h>#include <net/sock.h>#include <asm/uaccess.h>#include "trans.h"#include "gnbd_macros.h"#include "gnbd_kern.h"#include "gnbd.h"/* * prototype the data_xmit funtion here so I can leave it at the bottom of * this file. */typedef enum { DXwrite = 0, DXread, DXpeek} DXtype;static int data_xmit(DXtype send, struct socket *sock, void *buf, u32 size);static char *DXstrs[] = { "write", "read", "peek" };/* * cmd_RW() */voidcmd_RW(gnbd_t * tio, struct request *req, u32 len, u64 offset){ transferQ_t *tsQ; theader_t ioHdr; int err = 0; ASSERT(tio != NULL); ASSERT(req != NULL); ASSERT(tio->sock != NULL); ASSERT(req->buffer != NULL); if (!tio->recvd_run) { req->errors = -ESHUTDOWN; my_end_request(req); goto exit_cmd_rw; } tsQ = kmalloc(sizeof(transferQ_t), GFP_KERNEL); if (!tsQ) { req->errors = -ENOMEM; my_end_request(req); goto exit_cmd_rw; } /*Setup reply space */ tsQ->hdr.type = (req->cmd == READ) ? tioReadReq : tioWriteReq; tsQ->hdr.key = (unsigned long) tsQ; tsQ->hdr.len = len; tsQ->hdr.offset = offset; tsQ->hdr.err = 0; tsQ->req = req; tsQ->next = NULL; /*build header */ ioHdr.type = (req->cmd == READ) ? tioReadReq : tioWriteReq; ioHdr.err = 0; ioHdr.key = tsQ->hdr.key; /* untouch by server; don't need byte order */ ioHdr.offset = cpu_to_be64(offset); ioHdr.len = cpu_to_be32(len); /* for a read this is the amount we want. NOT the amount following. */ printd("cmd_RW: cmd=%x buffer=0x%p key=0x%llx offset=0x%llx len=0x%x\n" req->cmd, req->buffer, tsQ->hdr.key, offset, len);#if 0 /*Queue reply */ down(&tio->tqmux); tsQ->next = tio->Rhead; tio->Rhead = tsQ; up(&tio->tqmux);#endif /* FIXME: problem is here. durring a reconnect, requests can be queued * before the re-requests are made. This allows a request to be sent * twice. And thus received twice, which causes problems. * I wonder if I should just nest these. Gonna have to make sure I can * do that, but it may be the best solution. (or it may not) */ /* Does a simular problem exist with the new reconnect? Have to find * out. * Have to get it working first.... */ /*send buffer */ printd("About to send request data\n"); spin_unlock_irq(&io_request_lock); down(&tio->sender); err = data_xmit(DXwrite, tio->sock, &ioHdr, sizeof(theader_t)); if (req->cmd == WRITE) err = data_xmit(DXwrite, tio->sock, req->buffer, len); up(&tio->sender); spin_lock_irq(&io_request_lock); if (err == 0 || err == -ECONNRESET || err == -EPIPE) /* shutdown transiod. */ trans_shutdown(tio, 0); /* I wonder is putting this down here opens the possibility of this * getting recieved before I can put it onto the queue... */ /*Queue reply */ down(&tio->tqmux); tsQ->next = tio->Rhead; tio->Rhead = tsQ; up(&tio->tqmux); exit_cmd_rw: return;}/* * empty_transferQ_list() * Clears out and pending requests by setting the err, ending the request, * and then freeing the memory. */voidempty_transferQ_list(gnbd_t * tio){ transferQ_t *tmp, *kill = NULL; printd("Entering empty_transferQ_list\n"); if (tio == NULL) return; if (!tio->Rhead) return; printd("Proceding to empty transferQ\n"); down(&tio->tqmux); printd("Got tqmux, going to town.\n"); tmp = tio->Rhead; while (tmp != NULL) { kill = tmp; tmp = tmp->next; if (kill->req == NULL) { printe("kill(%p)->req was null\n", kill); kfree(kill); continue; } kill->req->errors = -ESHUTDOWN; my_end_request(kill->req); printd("freeing pointer 0x%p\n", kill); kfree(kill); } tio->Rhead = NULL; up(&tio->tqmux);}/* * transiod() * This thread handles the incomming data on the socket. * There will be a version of this thread running for each device opened. */inttransiod(void *data){ gnbd_t *tio = (gnbd_t *) data; theader_t ioHdr; int err; /* Set up the thread */ lock_kernel(); daemonize(); sprintf(current->comm, "gnbd_transiod"); up(&tio->tiodmux); printd("starting transiod loop\n"); /* now wait for data */ while (tio->recvd_run) { printd("==top of transiod loop\n"); ASSERT(tio != NULL); if (tio->sock == NULL) break; err = data_xmit(DXread, tio->sock, &ioHdr, sizeof(theader_t)); printd("Got header.\n"); if (err <= 0) { /* if read 0, socket closed. */ printe("Error trying to read header (%d)\n", err); tio->good_connect = 0; goto shutdowntransiod; } if (!tio->recvd_run) goto shutdowntransiod; be32_to_cpus(&ioHdr.len); /* Received a shutdown command from the server, reply with a shutdown * and stop this thread. */ if (ioHdr.type == tioShutdown) { printd("Got a shutdown\n"); ioHdr.type = tioShutdown; ioHdr.key = 0; ioHdr.offset = 0; ioHdr.len = 0; /*wrap in sender mutex? */ data_xmit(DXwrite, tio->sock, &ioHdr, sizeof(theader_t)); tio->recvd_run = 0; empty_transferQ_list(tio); break; } else /* Received a DataReply, * Pull down the data, and stuff it into the receive buffer that was * set up in the tansferQ structure way back in trans_io(). */ if (ioHdr.type == tioReadRpl || ioHdr.type == tioWriteRpl) { transferQ_t *tmp; printd("Got a DataReply\n"); /* find the matched receiver struct */ down(&tio->tqmux); /*first find. */ for (tmp = tio->Rhead; tmp != NULL && tmp->hdr.key != ioHdr.key; tmp = tmp->next) ; if (tmp == NULL) { printe ("ACK!! we couldn't match the incomming header with an " "existing request!\n" " key = 0x%llx\n" " len = 0x%x\n offset = 0x%llx\n" " err = 0x%x\n" " type = 0x%x\n", ioHdr.key, ioHdr.len, ioHdr.offset, ioHdr.err, ioHdr.type); printe("Expect to crash very soon now......\n"); /* I guess the best thing to do is to read in the extra crap, * and bit bucket it. I assume that it will reread if needed. * Or perhaps somehow someone on the list durring the resend * got on there when it wasn't supposed to..... */ goto shutdowntransiod; } { /* Then delete */ transferQ_t *temp, *last = NULL; if (tio->Rhead == tmp) { tio->Rhead = tio->Rhead->next; } else { for (temp = tio->Rhead; temp; last = temp, temp = temp->next) { if (temp == tmp) { last->next = temp->next; } } } } up(&tio->tqmux); printd("found request pointer, key is 0x%llx\n", tmp->hdr.key); if (tio->sock == NULL) break; if (ioHdr.len > tmp->hdr.len) { printe ("Ack! Server is sending %d bytes, but I was expecting " "only %d bytes.\n", ioHdr.len, tmp->hdr.len); printe("Expect to crash very soon now......\n"); } if (ioHdr.err != 0) tmp->req->errors = ioHdr.err; if ((tmp->hdr.type == tioReadReq) && (ioHdr.len != 0)) { printd("Receiving data\n"); err = data_xmit(DXread, tio->sock, tmp->req->buffer, min(ioHdr.len, tmp->hdr.len)); if (err < 0) {#if 0 tmp->req->errors = err;#endif printe("Error reading Data(%d)\n", err); } /* restartable errors */ if (err == 0 || err == -ECONNRESET || err == -EPIPE) { tio->good_connect = 0; goto shutdowntransiod; } } printd("done with request %p\n", tmp->req); my_end_request(tmp->req); kfree(tmp); /* all done with this */ } else { /* unrecognised header type. */ printe("Unrcognised header type of 0x%x\n", ioHdr.type); if (ioHdr.len != 0) { printe ("Oh gag! there's %d bytes of data following it as well " "Well, I'm ignoring it.\n", ioHdr.len); printe("Expect to crash very soon now......\n"); } } } /*while(tio->recvd_run) */ shutdowntransiod:#if 0 /* just because we made it here, does not mean we want to give it all up. */ empty_transferQ_list(tio);#endif if (tio->sock != NULL) sock_release(tio->sock); tio->sock = NULL; tio->recvd_run = 0; up(&tio->tiodmux); unlock_kernel(); return 0;}/* * resend_requests() * * For each pending request in the transferQ, steps through and re-issues * that request to the server. * This function is only intended to be called if we lost a server * connection and are reestablishing a new connection. */static voidresend_requests(gnbd_t * dev){ transferQ_t *tmp; theader_t ioHdr; int err; int i = 0; printe("=>Scan pending"); for (tmp = dev->Rhead; tmp; tmp = tmp->next) { i++; printk("."); ASSERT(tmp != NULL); ioHdr.type = tmp->hdr.type; ioHdr.key = tmp->hdr.key; ioHdr.err = 0; ioHdr.len = cpu_to_be32(tmp->hdr.len); ioHdr.offset = cpu_to_be64(tmp->hdr.offset); ASSERT(dev->sock != NULL); ASSERT(tmp->req->buffer != NULL); printd("Re-requesting key 0x%llx\n", ioHdr.key); down(&dev->sender); err = data_xmit(DXwrite, dev->sock, &ioHdr, sizeof(theader_t)); if (err <= 0) { printe("error resending request %d. ", i); if (dev->sock == NULL) printe ("It looks like the socket has been closed\n"); else printe ("It looks like the connection snapped\n"); } if (tmp->req->cmd == WRITE) data_xmit(DXwrite, dev->sock, tmp->req->buffer, tmp->hdr.len); up(&dev->sender); } printk("\n");}/* * trans_login() * * Holds the fancy stuff that we use to log into a server. This runs * before the transiod thread is created. * * Right now all we do is get the info structure from the server. */static inttrans_login(gnbd_t * dev){ int err = 0; tio_info_t tinfo; /* wait for server info */ err = data_xmit(DXread, dev->sock, &tinfo, sizeof(tio_info_t)); if (err < 0) goto exit_trans_login; dev->size = be64_to_cpu(tinfo.devsize); dev->readonly = tinfo.readonly; exit_trans_login: return err;}/* * trans_start_up() * starts up the trans_recvd thread after initializing the mutexes. * * This is also now the reconnect. This is simply just a call to * resend_requests if any requests are pending. */inttrans_start_up(gnbd_t * dev){ int err = 0; struct sockaddr_in addr; err = sock_create(AF_INET, SOCK_STREAM, 0, &dev->sock); if (err) { printe("Failed to create socket (%d)\n", err); goto exit_trans_startup; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = dev->servip; addr.sin_port = dev->servport; err = dev->sock->ops->connect(dev->sock, (struct sockaddr *) &addr, sizeof(addr), 0); if (err) { sock_release(dev->sock); dev->sock = NULL; printe("Unable to connect to server (%d)\n", err); goto exit_trans_startup; } /* setup some other things * These need to be done once. when the device is inited back in * init_module. */ init_MUTEX_LOCKED(&dev->tiodmux); if (trans_login(dev) < 0) { sock_release(dev->sock); dev->sock = NULL; printe("Login to server failed. %d\n", err); goto exit_trans_startup; } /* start up transiod thread */ if (!dev->recvd_run) { dev->recvd_run = 1; err = kernel_thread(transiod, dev, 0); if (err < 0) { sock_release(dev->sock); dev->sock = NULL; printe("Couldn't start transiod thread (%d)\n", err); dev->recvd_run = 0; goto exit_trans_startup; } /* wait for thread to get itself set up before continuing. */ down(&dev->tiodmux); wake_up(&dev->gassed); } /* before or after the thread exists? */ if (ISgnbdPending(dev)) resend_requests(dev); printd("resending requests\n"); gnbd_request(&blk_dev[gnbd_major].request_queue); exit_trans_startup: return err;}/* * trans_shutdown() * * In order to get the transiod out of it's sleep on the socket, we send a * Shutdown header. This causes the server to reply with a shutdown * header, and then kill that thread. We see the second Shutdown header, * and since we flipped the recvd_run flag off, we also exit. * Now we can close the socket and be on our merry way. */voidtrans_shutdown(gnbd_t * tio, int complete){ theader_t nop; if (tio->recvd_run) { /* don't do this if it already stopped */ tio->recvd_run = 0; nop.type = tioShutdown; nop.key = 0; nop.len = 0; nop.offset = 0; down(&tio->sender); /* make sure noone is half way through a send */ data_xmit(DXwrite, tio->sock, (char *) &nop, sizeof(theader_t)); up(&tio->sender); down(&tio->tiodmux); } /* do this to make sure things are down. */ if (complete) empty_transferQ_list(tio); if (tio->sock != NULL) sock_release(tio->sock); tio->sock = NULL;}/* * data_xmit() * * This function sends or recvs the specified amount of data. It exists * mostly as an abstraction to keep my mind for losing it anymore than it * already has. * This was based on the ndb xmit function, but I added a peek feature * which I don't use anymore. I've hacked on it in other ways though. *****************************************************************************/static intdata_xmit(DXtype send, struct socket *sock, void *buf, u32 size){ mm_segment_t oldfs; int result = -EIO; struct msghdr msg; struct iovec iov; unsigned long flags; sigset_t oldset; oldfs = get_fs(); set_fs(get_ds()); spin_lock_irqsave(¤t->sigmask_lock, flags); oldset = current->blocked; sigfillset(¤t->blocked); recalc_sigpending(current); spin_unlock_irqrestore(¤t->sigmask_lock, flags); do { iov.iov_base = buf; iov.iov_len = (u16) size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = &iov; msg.msg_iovlen = 1; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_namelen = 0; msg.msg_flags = 0; /* lookout. len param in socks is int. But we're doing a long. * could show problems here. (though would think the loop would * handle it.) */ if (send == DXwrite) result = sock_sendmsg(sock, &msg, (u16) size); else if (send == DXread) result = sock_recvmsg(sock, &msg, (u16) size, 0); else if (send == DXpeek) result = sock_recvmsg(sock, &msg, (u16) size, MSG_PEEK); if (result <= 0) { printe("%s - sock=%p at buf=%p, size=%u returned %d.\n", DXstrs[send], sock, buf, size, result); break; } size -= (u32) result; buf += result; } while (size > 0); spin_lock_irqsave(¤t->sigmask_lock, flags); current->blocked = oldset; recalc_sigpending(current); spin_unlock_irqrestore(¤t->sigmask_lock, flags); set_fs(oldfs); return result;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -