📄 socket.c
字号:
{ int i; isc_socket_t *sock; isc_boolean_t unlock_sock; REQUIRE(maxfd <= (int)FD_SETSIZE); /* * Process read/writes on other fds here. Avoid locking * and unlocking twice if both reads and writes are possible. */ for (i = 0; i < maxfd; i++) {#ifdef ISC_PLATFORM_USETHREADS if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1]) continue;#endif /* ISC_PLATFORM_USETHREADS */ if (manager->fdstate[i] == CLOSE_PENDING) { manager->fdstate[i] = CLOSED; FD_CLR(i, &manager->read_fds); FD_CLR(i, &manager->write_fds); (void)close(i); continue; } sock = manager->fds[i]; unlock_sock = ISC_FALSE; if (FD_ISSET(i, readfds)) { if (sock == NULL) { FD_CLR(i, &manager->read_fds); goto check_write; } unlock_sock = ISC_TRUE; LOCK(&sock->lock); if (!SOCK_DEAD(sock)) { if (sock->listener) dispatch_accept(sock); else dispatch_recv(sock); } FD_CLR(i, &manager->read_fds); } check_write: if (FD_ISSET(i, writefds)) { if (sock == NULL) { FD_CLR(i, &manager->write_fds); continue; } if (!unlock_sock) { unlock_sock = ISC_TRUE; LOCK(&sock->lock); } if (!SOCK_DEAD(sock)) { if (sock->connecting) dispatch_connect(sock); else dispatch_send(sock); } FD_CLR(i, &manager->write_fds); } if (unlock_sock) UNLOCK(&sock->lock); }}#ifdef ISC_PLATFORM_USETHREADS/* * This is the thread that will loop forever, always in a select or poll * call. * * When select returns something to do, track down what thread gets to do * this I/O and post the event to it. */static isc_threadresult_twatcher(void *uap) { isc_socketmgr_t *manager = uap; isc_boolean_t done; int ctlfd; int cc; fd_set readfds; fd_set writefds; int msg, fd; int maxfd; char strbuf[ISC_STRERRORSIZE]; /* * Get the control fd here. This will never change. */ LOCK(&manager->lock); ctlfd = manager->pipe_fds[0]; done = ISC_FALSE; while (!done) { do { readfds = manager->read_fds; writefds = manager->write_fds; maxfd = manager->maxfd + 1; UNLOCK(&manager->lock); cc = select(maxfd, &readfds, &writefds, NULL, NULL); if (cc < 0) { if (!SOFT_ERROR(errno)) { isc__strerror(errno, strbuf, sizeof(strbuf)); FATAL_ERROR(__FILE__, __LINE__, "select() %s: %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), strbuf); } } LOCK(&manager->lock); } while (cc < 0); /* * Process reads on internal, control fd. */ if (FD_ISSET(ctlfd, &readfds)) { for (;;) { select_readmsg(manager, &fd, &msg); manager_log(manager, IOEVENT, isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_WATCHERMSG, "watcher got message %d"), msg); /* * Nothing to read? */ if (msg == SELECT_POKE_NOTHING) break; /* * Handle shutdown message. We really should * jump out of this loop right away, but * it doesn't matter if we have to do a little * more work first. */ if (msg == SELECT_POKE_SHUTDOWN) { done = ISC_TRUE; break; } /* * This is a wakeup on a socket. Look * at the event queue for both read and write, * and decide if we need to watch on it now * or not. */ wakeup_socket(manager, fd, msg); } } process_fds(manager, maxfd, &readfds, &writefds); } manager_log(manager, TRACE, isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_EXITING, "watcher exiting")); UNLOCK(&manager->lock); return ((isc_threadresult_t)0);}#endif /* ISC_PLATFORM_USETHREADS *//* * Create a new socket manager. */isc_result_tisc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { isc_socketmgr_t *manager;#ifdef ISC_PLATFORM_USETHREADS char strbuf[ISC_STRERRORSIZE];#endif REQUIRE(managerp != NULL && *managerp == NULL);#ifndef ISC_PLATFORM_USETHREADS if (socketmgr != NULL) { socketmgr->refs++; *managerp = socketmgr; return (ISC_R_SUCCESS); }#endif /* ISC_PLATFORM_USETHREADS */ manager = isc_mem_get(mctx, sizeof(*manager)); if (manager == NULL) return (ISC_R_NOMEMORY); manager->magic = SOCKET_MANAGER_MAGIC; manager->mctx = NULL; memset(manager->fds, 0, sizeof(manager->fds)); ISC_LIST_INIT(manager->socklist); if (isc_mutex_init(&manager->lock) != ISC_R_SUCCESS) { isc_mem_put(mctx, manager, sizeof(*manager)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_mutex_init() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); return (ISC_R_UNEXPECTED); }#ifdef ISC_PLATFORM_USETHREADS if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) { DESTROYLOCK(&manager->lock); isc_mem_put(mctx, manager, sizeof(*manager)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_condition_init() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); return (ISC_R_UNEXPECTED); } /* * Create the special fds that will be used to wake up the * select/poll loop when something internal needs to be done. */ if (pipe(manager->pipe_fds) != 0) { DESTROYLOCK(&manager->lock); isc_mem_put(mctx, manager, sizeof(*manager)); isc__strerror(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, "pipe() %s: %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), strbuf); return (ISC_R_UNEXPECTED); } RUNTIME_CHECK(make_nonblock(manager->pipe_fds[0]) == ISC_R_SUCCESS);#if 0 RUNTIME_CHECK(make_nonblock(manager->pipe_fds[1]) == ISC_R_SUCCESS);#endif#else /* ISC_PLATFORM_USETHREADS */ manager->refs = 1;#endif /* ISC_PLATFORM_USETHREADS */ /* * Set up initial state for the select loop */ FD_ZERO(&manager->read_fds); FD_ZERO(&manager->write_fds);#ifdef ISC_PLATFORM_USETHREADS FD_SET(manager->pipe_fds[0], &manager->read_fds); manager->maxfd = manager->pipe_fds[0];#else /* ISC_PLATFORM_USETHREADS */ manager->maxfd = 0;#endif /* ISC_PLATFORM_USETHREADS */ memset(manager->fdstate, 0, sizeof(manager->fdstate));#ifdef ISC_PLATFORM_USETHREADS /* * Start up the select/poll thread. */ if (isc_thread_create(watcher, manager, &manager->watcher) != ISC_R_SUCCESS) { (void)close(manager->pipe_fds[0]); (void)close(manager->pipe_fds[1]); DESTROYLOCK(&manager->lock); isc_mem_put(mctx, manager, sizeof(*manager)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_thread_create() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); return (ISC_R_UNEXPECTED); }#endif /* ISC_PLATFORM_USETHREADS */ isc_mem_attach(mctx, &manager->mctx);#ifndef ISC_PLATFORM_USETHREADS socketmgr = manager;#endif /* ISC_PLATFORM_USETHREADS */ *managerp = manager; return (ISC_R_SUCCESS);}voidisc_socketmgr_destroy(isc_socketmgr_t **managerp) { isc_socketmgr_t *manager; int i; isc_mem_t *mctx; /* * Destroy a socket manager. */ REQUIRE(managerp != NULL); manager = *managerp; REQUIRE(VALID_MANAGER(manager));#ifndef ISC_PLATFORM_USETHREADS if (manager->refs > 1) { manager->refs--; *managerp = NULL; return; }#endif /* ISC_PLATFORM_USETHREADS */ LOCK(&manager->lock);#ifdef ISC_PLATFORM_USETHREADS /* * Wait for all sockets to be destroyed. */ while (!ISC_LIST_EMPTY(manager->socklist)) { manager_log(manager, CREATION, isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_SOCKETSREMAIN, "sockets exist")); WAIT(&manager->shutdown_ok, &manager->lock); }#else /* ISC_PLATFORM_USETHREADS */ /* * Hope all sockets have been destroyed. */ if (!ISC_LIST_EMPTY(manager->socklist)) { manager_log(manager, CREATION, isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_SOCKETSREMAIN, "sockets exist")); INSIST(0); }#endif /* ISC_PLATFORM_USETHREADS */ UNLOCK(&manager->lock); /* * Here, poke our select/poll thread. Do this by closing the write * half of the pipe, which will send EOF to the read half. * This is currently a no-op in the non-threaded case. */ select_poke(manager, 0, SELECT_POKE_SHUTDOWN);#ifdef ISC_PLATFORM_USETHREADS /* * Wait for thread to exit. */ if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS) UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_thread_join() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"));#endif /* ISC_PLATFORM_USETHREADS */ /* * Clean up. */#ifdef ISC_PLATFORM_USETHREADS (void)close(manager->pipe_fds[0]); (void)close(manager->pipe_fds[1]); (void)isc_condition_destroy(&manager->shutdown_ok);#endif /* ISC_PLATFORM_USETHREADS */ for (i = 0; i < (int)FD_SETSIZE; i++) if (manager->fdstate[i] == CLOSE_PENDING) (void)close(i); DESTROYLOCK(&manager->lock); manager->magic = 0; mctx= manager->mctx; isc_mem_put(mctx, manager, sizeof(*manager)); isc_mem_detach(&mctx); *managerp = NULL;}static isc_result_tsocket_recv(isc_socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, unsigned int flags){ int io_state; isc_boolean_t have_lock = ISC_FALSE; isc_task_t *ntask = NULL; isc_result_t result = ISC_R_SUCCESS; dev->ev_sender = task; if (sock->type == isc_sockettype_udp) { io_state = doio_recv(sock, dev); } else { LOCK(&sock->lock); have_lock = ISC_TRUE; if (ISC_LIST_EMPTY(sock->recv_list)) io_state = doio_recv(sock, dev); else io_state = DOIO_SOFT; } switch (io_state) { case DOIO_SOFT: /* * We couldn't read all or part of the request right now, so * queue it. * * Attach to socket and to task */ isc_task_attach(task, &ntask); dev->attributes |= ISC_SOCKEVENTATTR_ATTACHED; if (!have_lock) { LOCK(&sock->lock); have_lock = ISC_TRUE; } /* * Enqueue the request. If the socket was previously not being * watched, poke the watcher to start paying attention to it. */ if (ISC_LIST_EMPTY(sock->recv_list)) select_poke(sock->manager, sock->fd, SELECT_POKE_READ); ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link); socket_log(sock, NULL, EVENT, NULL, 0, 0, "socket_recv: event %p -> task %p", dev, ntask); if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) result = ISC_R_INPROGRESS; break; case DOIO_EOF: dev->result = ISC_R_EOF; /* fallthrough */ case DOIO_HARD: case DOIO_SUCCESS: if ((flags & ISC_SOCKFLAG_IMMEDIATE) == 0) send_recvdone_event(sock, &dev); break; } if (have_lock) UNLOCK(&sock->lock); return (result);}isc_result_tisc_socket_recvv(isc_socket_t *sock, isc_bufferlist_t *buflist, unsigned int minimum, isc_task_t *task, isc_taskaction_t action, const void *arg){ isc_socketevent_t *dev; isc_socketmgr_t *manager; unsigned int iocount; isc_buffer_t *buffer; REQUIRE(VALID_SOCKET(sock)); REQUIRE(buflist != NULL); REQUIRE(!ISC_LIST_EMPTY(*buflist)); REQUIRE(task != NULL); REQUIRE(action != NULL); manager = sock->manager; REQUIRE(VALID_MANAGER(manager)); iocount = isc_bufferlist_availablecount(buflist); REQUIRE(iocount > 0); INSIST(sock->bound); dev = allocate_socketevent(sock, ISC_SOCKEVENT_RECVDONE, action, arg); if (dev == NULL) { return (ISC_R_NOMEMORY); } /* * UDP sockets are always partial read */ if (sock->type == isc_sockettype_udp) dev->minimum = 1; else { if (minimum == 0) dev->minimum = iocount; else dev->minimum = minimum; } /* * Move each buffer from the passed in list to our internal one. */ buffer = ISC_LIST_HEAD(*buflist); while (buffer != NULL) { ISC_LIST_DEQUEUE(*buflist, buffer, link); ISC_LIST_ENQUEUE(dev->bufferlist, buffer, link); buffer = ISC_LIST_HEAD(*buflist); } return (socket_recv(sock, dev, task, 0));}isc_result_tisc_socket_recv(isc_socket_t *sock, isc_region_t *region, unsigned int minimum, isc_task_t *task, isc_taskaction_t action, const void *arg){ isc_socketevent_t *dev; isc_socketmgr_t *manager; REQUIRE(VALID_SOCKET(sock)); REQUIRE(action != NULL); manager = sock->manager; REQUIRE(VALID_MANAGER(manager)); INSIST(sock->bound); dev = allocate_socketevent(sock, ISC_SOCKEVENT_RECVDONE, action, arg); if (dev == NULL) return (ISC_R_NOMEMORY); return (isc_socket_recv2(sock, region, minimum, task, dev, 0));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -