📄 jthread.c
字号:
can_read_from_pipe = (pollarray[--nfd].revents & POLLIN);#else can_read_from_pipe = FD_ISSET(sigPipe[0], &rd);#endif /* drain helper pipe if a byte was written */ if (r > 0 && can_read_from_pipe) { char c; /* NB: since "rd" is a thread-local variable, it can * still say that we should read from the pipe when * in fact another thread has already read from it. * That's why we count how many bytes go in and out. */ if (bytesInPipe > 0) { read(sigPipe[0], &c, 1); bytesInPipe--; } } if (sigPending) { processSignals(); } } if ((r < 0 && errno == EINTR) && !canSleep) goto retry; if (r <= 0) return;DBG(JTHREADDETAIL, dprintf("Select returns %d\n", r); );#if USE_POLL for (i = 0; r > 0 && i < nfd; i++) { int fd; register short rev = pollarray[i].revents; if (rev == 0) { continue; } fd = pollarray[i].fd; needReschedule = true; r--; /* If there's an error, we don't know whether to wake * up readers or writers. So wake up both if so. * Note that things such as failed connect attempts * are reported as errors, not a read or write readiness. */ /* wake up readers when not just POLLOUT */ if (rev != POLLOUT && readQ[fd] != 0) { resumeQueue(readQ[fd]); readQ[fd] = 0; } /* wake up writers when not just POLLIN */ if (rev != POLLIN && writeQ[fd] != 0) { resumeQueue(writeQ[fd]); writeQ[fd] = 0; } }#else for (i = 0; r > 0 && i <= maxFd; i++) { if (readQ[i] != 0 && FD_ISSET(i, &rd)) { needReschedule = true; resumeQueue(readQ[i]); readQ[i] = 0; r--; } if (writeQ[i] != 0 && FD_ISSET(i, &wr)) { needReschedule = true; resumeQueue(writeQ[i]); writeQ[i] = 0; r--; } }#endif return;}/* * A file I/O operation could not be completed. Sleep until we are woken up * by the SIGIO handler. * * Interrupts are disabled on entry and exit. * fd is assumed to be valid. * Returns true if operation was interrupted. */static intblockOnFile(int fd, int op, int timeout){ int rc = false;DBG(JTHREAD, dprintf("blockOnFile(%d,%s)\n", fd, op == TH_READ ? "r":"w"); ); assert(intsDisabled()); BLOCKED_ON_EXTERNAL(currentJThread); if (fd > maxFd) { maxFd = fd; } if (op == TH_READ) { FD_SET(fd, &readsPending); rc = suspendOnQThread(currentJThread, &readQ[fd], timeout); FD_CLR(fd, &readsPending); } else { FD_SET(fd, &writesPending); rc = suspendOnQThread(currentJThread, &writeQ[fd], timeout); FD_CLR(fd, &writesPending); } return (rc);}/*============================================================================ * * locking subsystem * */void jmutex_initialise(jmutex *lock){ lock->holder = NULL; lock->waiting = NULL;}voidjmutex_lock(jmutex *lock){DBG(JTHREAD, dprintf("jmutex_lock(%p)\n", lock); ); intsDisable(); jthread_current()->flags |= THREAD_FLAGS_WAIT_MUTEX; while (lock->holder != NULL) suspendOnQThread(jthread_current(), &lock->waiting, NOTIMEOUT); jthread_current()->flags &= ~THREAD_FLAGS_WAIT_MUTEX; lock->holder = jthread_current(); intsRestore();}voidjmutex_unlock(jmutex *lock){DBG(JTHREAD, dprintf("jmutex_unlock(%p)\n", lock); ); intsDisable(); lock->holder = NULL; if (lock->waiting != 0) { jthread* tid; KaffeNodeQueue* node = lock->waiting; tid = JTHREADQ(node); lock->waiting = node->next; KaffePoolReleaseNode(queuePool, node); assert(tid->status != THREAD_RUNNING); resumeThread(tid); } intsRestore();}voidjmutex_destroy(jmutex *lock){ assert(lock->holder == NULL); assert(lock->waiting == NULL);}/* JThreads does not have special ambiguities concerning the * handling of the GC lock. */voidjthread_lockGC(void){ jmutex_lock(&GClock);}voidjthread_unlockGC(void){ jmutex_unlock(&GClock);}voidjcondvar_initialise(jcondvar *cv){ *cv = NULL;}jbooleanjcondvar_wait(jcondvar *cv, jmutex *lock, jlong timeout){ jthread *current = jthread_current(); jboolean r; intsDisable(); /* give up mutex */ lock->holder = NULL; if (lock->waiting != NULL) { jthread* tid; KaffeNodeQueue* node = lock->waiting; tid = JTHREADQ(node); lock->waiting = node->next; KaffePoolReleaseNode(queuePool, node); assert(tid->status != THREAD_RUNNING); resumeThread(tid); }#if defined(DETECTDEADLOCK) /* a limited wait should not cause us to scream deadlock */ if (timeout != 0) { BLOCKED_ON_EXTERNAL(currentJThread); }#endif /* wait to be signaled */ current->flags |= THREAD_FLAGS_WAIT_CONDVAR; r = suspendOnQThread(current, cv, (long) timeout); current->flags &= ~THREAD_FLAGS_WAIT_CONDVAR; /* reacquire mutex */ current->flags |= THREAD_FLAGS_WAIT_MUTEX; while (lock->holder != NULL) { suspendOnQThread(current, &lock->waiting, NOTIMEOUT); } current->flags &= ~THREAD_FLAGS_WAIT_MUTEX; lock->holder = current; intsRestore(); return (r);}voidjcondvar_signal(jcondvar *cv, jmutex *lock){ intsDisable(); if (*cv != NULL) { KaffeNodeQueue* condQ; /* take off condvar queue */ condQ = *cv; *cv = condQ->next; /* put on lock queue */ condQ->next = lock->waiting; lock->waiting = condQ; } intsRestore();}voidjcondvar_broadcast(jcondvar *cv, jmutex *lock){ intsDisable(); if (*cv != NULL) { /* splice the lists `*cv' and `lock->waiting' */ KaffeNodeQueue** condp; /* advance to last element in cv list */ for (condp = cv; *condp != 0; condp = &(*condp)->next) ; (*condp) = lock->waiting; lock->waiting = *cv; *cv = NULL; } intsRestore();}voidjcondvar_destroy(jcondvar* cv){ assert(*cv == NULL);}/*============================================================================ * * I/O routines that are exported to the user * *//* * Create a threaded file descriptor. * * We try various fcntl and ioctl to put the file descriptor in non-blocking * mode and to enable asynchronous notifications. */static intjthreadedFileDescriptor(int fd){ int r;#if (defined(FIOSSAIOSTAT) && !((defined(hpux) || defined (__hpux__)) && defined(FIOASYNC))) || \ (defined(FIOASYNC) && !defined(linux)) int on = 1;#endif#if (defined(FIOSSAIOOWN) && !((defined(hpux) || defined (__hpux__)) && defined(F_SETOWN))) || \ defined(F_SETOWN) /* cache pid to accommodate antique C libraries */ static int pid = -1; if (pid == -1) pid = getpid();#endif if (fd == -1) return (fd);#if defined(F_SETFD) /* set close-on-exec flag for this file descriptor */ if ((r = fcntl(fd, F_SETFD, 1)) < 0) { perror("F_SETFD"); return (r); }#endif /* Make non-blocking */ if ((r = fcntl(fd, F_GETFL, 0)) < 0) { perror("F_GETFL"); return (r); } /* * Apparently, this can fail, for instance when we stdout is * redirected to /dev/null. (On FreeBSD) */ fcntl(fd, F_SETFL, r | O_NONBLOCK #if defined(O_ASYNC) | O_ASYNC#elif defined(FASYNC) | FASYNC#endif );#if defined(FIOSSAIOSTAT) && !((defined(hpux) || defined (__hpux__)) && defined(FIOASYNC)) r = ioctl(fd, FIOSSAIOSTAT, &on); if (r < 0 && errno != ENOTTY) { /* Defines ENOTTY to be an acceptable error */ perror("FIOSSAIOSTAT"); return (r); }#elif defined(FIOASYNC) && !defined(linux) /* Don't do this on Linux because Linux version < 2.2.5 doesn't * know what FIOASYNC means. It thinks FIOASYNC == O_SYNC. I kid you * not. You can imagine what that means. ;-) * Never mind, FASYNC work as expected and is already set :) */ /* * This ioctl fails for so many systems on so many occasions. * Reasons include ENXIO, ENOTTY, EINVAL(?) */ r = ioctl(fd, FIOASYNC, &on); if (r < 0) { DBG(JTHREAD, perror("FIOASYNC"); ); }#endif#if !(defined(O_ASYNC) || defined(FIOASYNC) || \ defined(FASYNC) || defined(FIOSSAIOSTAT))#error Could not put socket in async mode#endif /* Allow socket to signal this process when new data is available */#if defined(FIOSSAIOOWN) && !((defined(hpux) || defined (__hpux__)) && defined(F_SETOWN)) r = ioctl(fd, FIOSSAIOOWN, &pid); if (r == -1 && errno != ENOTTY) { perror("Error doing FIOSSAIOWN"); } #elif defined(F_SETOWN) /* On some systems, this will flag an error if fd is not a socket */ r = fcntl(fd, F_SETOWN, pid); if (r < 0) { DBG(JTHREAD, perror("F_SETOWN"); ); }#endif return (fd);}/* * clear non-blocking flag for a file descriptor */voidjthreadRestoreFD(int fd){ /* clear nonblocking flag */ fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK);}/* * In SVR4 systems (notably AIX and HPUX 9.x), putting a file descriptor * in non-blocking mode affects the actual terminal file. * Thus, the shell we see the fd in * non-blocking mode when we exit and log the user off. * * Under Solaris, this happens if you use FIONBIO to get into non-blocking * mode. (as opposed to O_NONBLOCK) */static voidrestore_fds(void){ int i; /* clear non-blocking flag for file descriptor stdin, stdout, stderr */ for (i = 0; i < 3; i++) { jthreadRestoreFD(i); }}static voidrestore_fds_and_exit(){ restore_fds(); /* technically, we should restore the original handler and rethrow * the signal. */ KAFFEVM_EXIT(-1); /* XXX */}/* * Threaded socket create. */intjthreadedSocket(int af, int type, int proto, int *out){ int r; intsDisable(); r = socket(af, type, proto); if (r == -1) { r = errno; } else { *out = jthreadedFileDescriptor(r); r = 0; } intsRestore(); return (r);}/* * Return thread-specific data for a given jthread. */threadData*jthread_get_data(jthread_t tid){ return (&tid->localData);}/* * Return thread status. */int jthread_get_status(jthread_t jt){ return (jt->status);}/* * Check if thread is interrupted. */int jthread_is_interrupted(jthread_t jt){ return (jt->flags & THREAD_FLAGS_INTERRUPTED);}int jthread_interrupted(jthread_t jt){ if (jt->flags & THREAD_FLAGS_INTERRUPTED) { jt->flags &= ~THREAD_FLAGS_INTERRUPTED; return 1; } return 0;}int jthread_on_mutex(jthread_t jt){ return (jt->flags & THREAD_FLAGS_WAIT_MUTEX);}int jthread_on_condvar(jthread_t jt){ return (jt->flags & THREAD_FLAGS_WAIT_CONDVAR);}void jthread_clear_run(jthread_t jt){ jt->startUsed = 0;}int jthread_has_run(jthread_t jt){ return (jt->startUsed != 0);}/* * Threaded file open. */intjthreadedOpen(const char* path, int flags, int mode, int *out){ int r; intsDisable(); /* Cygnus WinNT requires this */ r = open(path, flags|O_BINARY, mode); if (r == -1) { r = errno; } else { *out = jthreadedFileDescriptor(r); r = 0; } intsRestore(); return (r);}/* * various building blocks for timeout system call functions */#define SET_DEADLINE(deadline, timeout) \ if (timeout != NOTIMEOUT) { \ jlong ct = currentTime(); \ deadline = timeout + ct; \ if( deadline < ct ) { \ deadline = 0; \ timeout = NOTIMEOUT; \ } \ }#define BREAK_IF_LATE(deadline, timeout) \ if (timeout != NOTIMEOUT) { \ if (currentTime() >= deadline) { \ errno = ETIMEDOUT; \ break; \ } \ }#define IGNORE_EINTR(r) \ if (r == -1 && errno == EINTR) { \ continue; \ }#define SET_RETURN(r) \ if (r == -1) { \ r = errno; \ } #define SET_RETURN_OUT(r, out, ret) \ if (r == -1) { \ r = errno; \ } else { \ *out = ret; \ r = 0; \ }#define CALL_BLOCK_ON_FILE(A, B, C) \ if (blockOnFile(A, B, C)) { \ /* interrupted via jthread_interrupt() */ \ errno = EINTR; \ break; \ }/* * Threaded socket connect. */intjthreadedConnect(int fd, struct sockaddr* addr, int len, int timeout){ int r; jlong deadline = 0; int inProgress = 0; intsDisabl
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -