📄 pth_high.c
字号:
&& errno == EINTR) ;#endif } /* restore filedescriptor mode */ pth_shield { pth_fdmode(fd, fdmode); } pth_debug2("pth_writev_ev: leave to thread \"%s\"", pth_current->name); return rv;}/* calculate number of bytes in a struct iovec */intern ssize_t pth_writev_iov_bytes(const struct iovec *iov, int iovcnt){ ssize_t bytes; int i; bytes = 0; for (i = 0; i < iovcnt; i++) { if (iov[i].iov_len <= 0) continue; bytes += iov[i].iov_len; } return bytes;}/* advance the virtual pointer of a struct iov */intern void pth_writev_iov_advance(const struct iovec *riov, int riovcnt, size_t advance, struct iovec **liov, int *liovcnt, struct iovec *tiov, int tiovcnt){ int i; if (*liov == NULL && *liovcnt == 0) { /* initialize with real (const) structure on first step */ *liov = (struct iovec *)riov; *liovcnt = riovcnt; } if (advance > 0) { if (*liov == riov && *liovcnt == riovcnt) { /* reinitialize with a copy to be able to adjust it */ *liov = &tiov[0]; for (i = 0; i < riovcnt; i++) { tiov[i].iov_base = riov[i].iov_base; tiov[i].iov_len = riov[i].iov_len; } } /* advance the virtual pointer */ while (*liovcnt > 0 && advance > 0) { if ((*liov)->iov_len > advance) { (*liov)->iov_base = (char *)((*liov)->iov_base) + advance; (*liov)->iov_len -= advance; break; } else { advance -= (*liov)->iov_len; (*liovcnt)--; (*liov)++; } } } return;}/* A faked version of writev(2) */intern ssize_t pth_writev_faked(int fd, const struct iovec *iov, int iovcnt){ char *buffer, *cp; size_t bytes, to_copy, copy, rv; int i; /* determine total number of bytes to write */ bytes = 0; for (i = 0; i < iovcnt; i++) { if (iov[i].iov_len <= 0) return pth_error((ssize_t)(-1), EINVAL); bytes += iov[i].iov_len; } if (bytes <= 0) return pth_error((ssize_t)(-1), EINVAL); /* allocate a temporary buffer to hold the data */ if ((buffer = (char *)malloc(bytes)) == NULL) return (ssize_t)(-1); /* concatenate the data from callers vector into buffer */ to_copy = bytes; cp = buffer; for (i = 0; i < iovcnt; i++) { copy = pth_util_min(iov[i].iov_len, to_copy); memcpy(cp, iov[i].iov_base, copy); to_copy -= copy; if (to_copy <= 0) break; } /* write continuous chunck of data (caller guarrantied us to not block) */ rv = pth_sc(write)(fd, buffer, bytes); /* remove the temporary buffer */ pth_shield { free(buffer); } return(rv);}/* Pth variant of POSIX pread(3) */ssize_t pth_pread(int fd, void *buf, size_t nbytes, off_t offset){ static pth_mutex_t mutex = PTH_MUTEX_INIT; off_t old_offset; ssize_t rc; /* protect us: pth_read can yield! */ if (!pth_mutex_acquire(&mutex, FALSE, NULL)) return (-1); /* remember current offset */ if ((old_offset = lseek(fd, 0, SEEK_CUR)) == (off_t)(-1)) { pth_mutex_release(&mutex); return (-1); } /* seek to requested offset */ if (lseek(fd, offset, SEEK_SET) == (off_t)(-1)) { pth_mutex_release(&mutex); return (-1); } /* perform the read operation */ rc = pth_read(fd, buf, nbytes); /* restore the old offset situation */ pth_shield { lseek(fd, old_offset, SEEK_SET); } /* unprotect and return result of read */ pth_mutex_release(&mutex); return rc;}/* Pth variant of POSIX pwrite(3) */ssize_t pth_pwrite(int fd, const void *buf, size_t nbytes, off_t offset){ static pth_mutex_t mutex = PTH_MUTEX_INIT; off_t old_offset; ssize_t rc; /* protect us: pth_write can yield! */ if (!pth_mutex_acquire(&mutex, FALSE, NULL)) return (-1); /* remember current offset */ if ((old_offset = lseek(fd, 0, SEEK_CUR)) == (off_t)(-1)) { pth_mutex_release(&mutex); return (-1); } /* seek to requested offset */ if (lseek(fd, offset, SEEK_SET) == (off_t)(-1)) { pth_mutex_release(&mutex); return (-1); } /* perform the write operation */ rc = pth_write(fd, buf, nbytes); /* restore the old offset situation */ pth_shield { lseek(fd, old_offset, SEEK_SET); } /* unprotect and return result of write */ pth_mutex_release(&mutex); return rc;}/* Pth variant of SUSv2 recv(2) */ssize_t pth_recv(int s, void *buf, size_t len, int flags){ return pth_recv_ev(s, buf, len, flags, NULL);}/* Pth variant of SUSv2 recv(2) with extra event(s) */ssize_t pth_recv_ev(int s, void *buf, size_t len, int flags, pth_event_t ev){ return pth_recvfrom_ev(s, buf, len, flags, NULL, 0, ev);}/* Pth variant of SUSv2 recvfrom(2) */ssize_t pth_recvfrom(int s, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen){ return pth_recvfrom_ev(s, buf, len, flags, from, fromlen, NULL);}/* Pth variant of SUSv2 recvfrom(2) with extra event(s) */ssize_t pth_recvfrom_ev(int fd, void *buf, size_t nbytes, int flags, struct sockaddr *from, socklen_t *fromlen, pth_event_t ev_extra){ struct timeval delay; pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; fd_set fds; int fdmode; int n; pth_implicit_init(); pth_debug2("pth_recvfrom_ev: enter from thread \"%s\"", pth_current->name); /* POSIX compliance */ if (nbytes == 0) return 0; if (!pth_util_fd_valid(fd)) return pth_error(-1, EBADF); /* check mode of filedescriptor */ if ((fdmode = pth_fdmode(fd, PTH_FDMODE_POLL)) == PTH_FDMODE_ERROR) return pth_error(-1, EBADF); /* poll filedescriptor if not already in non-blocking operation */ if (fdmode == PTH_FDMODE_BLOCK) { /* now directly poll filedescriptor for readability to avoid unneccessary (and resource consuming because of context switches, etc) event handling through the scheduler */ if (!pth_util_fd_valid(fd)) return pth_error(-1, EBADF); FD_ZERO(&fds); FD_SET(fd, &fds); delay.tv_sec = 0; delay.tv_usec = 0; while ((n = pth_sc(select)(fd+1, &fds, NULL, NULL, &delay)) < 0 && errno == EINTR) ; if (n < 0 && (errno == EINVAL || errno == EBADF)) return pth_error(-1, errno); /* if filedescriptor is still not readable, let thread sleep until it is or the extra event occurs */ if (n == 0) { ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, fd); if (ev_extra != NULL) pth_event_concat(ev, ev_extra, NULL); n = pth_wait(ev); if (ev_extra != NULL) { pth_event_isolate(ev); if (pth_event_status(ev) != PTH_STATUS_OCCURRED) return pth_error(-1, EINTR); } } } /* now perform the actual read. We're now guarrantied to not block, either because we were already in non-blocking mode or we determined above by polling that the next recvfrom(2) call will not block. But keep in mind, that only 1 next recvfrom(2) call is guarrantied to not block (except for the EINTR situation). */ while ((n = pth_sc(recvfrom)(fd, buf, nbytes, flags, from, fromlen)) < 0 && errno == EINTR) ; pth_debug2("pth_recvfrom_ev: leave to thread \"%s\"", pth_current->name); return n;}/* Pth variant of SUSv2 send(2) */ssize_t pth_send(int s, const void *buf, size_t len, int flags){ return pth_send_ev(s, buf, len, flags, NULL);}/* Pth variant of SUSv2 send(2) with extra event(s) */ssize_t pth_send_ev(int s, const void *buf, size_t len, int flags, pth_event_t ev){ return pth_sendto_ev(s, buf, len, flags, NULL, 0, ev);}/* Pth variant of SUSv2 sendto(2) */ssize_t pth_sendto(int s, const void *buf, size_t len, int flags, const struct sockaddr *to, socklen_t tolen){ return pth_sendto_ev(s, buf, len, flags, to, tolen, NULL);}/* Pth variant of SUSv2 sendto(2) with extra event(s) */ssize_t pth_sendto_ev(int fd, const void *buf, size_t nbytes, int flags, const struct sockaddr *to, socklen_t tolen, pth_event_t ev_extra){ struct timeval delay; pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; fd_set fds; int fdmode; ssize_t rv; ssize_t s; int n; pth_implicit_init(); pth_debug2("pth_sendto_ev: enter from thread \"%s\"", pth_current->name); /* POSIX compliance */ if (nbytes == 0) return 0; if (!pth_util_fd_valid(fd)) return pth_error(-1, EBADF); /* force filedescriptor into non-blocking mode */ if ((fdmode = pth_fdmode(fd, PTH_FDMODE_NONBLOCK)) == PTH_FDMODE_ERROR) return pth_error(-1, EBADF); /* poll filedescriptor if not already in non-blocking operation */ if (fdmode != PTH_FDMODE_NONBLOCK) { /* now directly poll filedescriptor for writeability to avoid unneccessary (and resource consuming because of context switches, etc) event handling through the scheduler */ if (!pth_util_fd_valid(fd)) { pth_fdmode(fd, fdmode); return pth_error(-1, EBADF); } FD_ZERO(&fds); FD_SET(fd, &fds); delay.tv_sec = 0; delay.tv_usec = 0; while ((n = pth_sc(select)(fd+1, NULL, &fds, NULL, &delay)) < 0 && errno == EINTR) ; if (n < 0 && (errno == EINVAL || errno == EBADF)) return pth_error(-1, errno); rv = 0; for (;;) { /* if filedescriptor is still not writeable, let thread sleep until it is or event occurs */ if (n == 0) { ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, fd); if (ev_extra != NULL) pth_event_concat(ev, ev_extra, NULL); pth_wait(ev); if (ev_extra != NULL) { pth_event_isolate(ev); if (pth_event_status(ev) != PTH_STATUS_OCCURRED) { pth_fdmode(fd, fdmode); return pth_error(-1, EINTR); } } } /* now perform the actual send operation */ while ((s = pth_sc(sendto)(fd, buf, nbytes, flags, to, tolen)) < 0 && errno == EINTR) ; if (s > 0) rv += s; /* although we're physically now in non-blocking mode, iterate unless all data is written or an error occurs, because we've to mimic the usual blocking I/O behaviour of write(2). */ if (s > 0 && s < (ssize_t)nbytes) { nbytes -= s; buf = (void *)((char *)buf + s); n = 0; continue; } /* pass error to caller, but not for partial writes (rv > 0) */ if (s < 0 && rv == 0) rv = -1; /* stop looping */ break; } } else { /* just perform the actual send operation */ while ((rv = pth_sc(sendto)(fd, buf, nbytes, flags, to, tolen)) < 0 && errno == EINTR) ; } /* restore filedescriptor mode */ pth_shield { pth_fdmode(fd, fdmode); } pth_debug2("pth_sendto_ev: leave to thread \"%s\"", pth_current->name); return rv;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -