📄 pth_sched.c
字号:
ev->ev_status = PTH_STATUS_OCCURRED; any_occurred = TRUE; } } } while ((ev = ev->ev_next) != evh); } if (any_occurred) dopoll = TRUE; /* now decide how to poll for fd I/O and timers */ if (dopoll) { /* do a polling with immediate timeout, i.e. check the fd sets only without blocking */ pth_time_set(&delay, PTH_TIME_ZERO); pdelay = &delay; } else if (nexttimer_ev != NULL) { /* do a polling with a timeout set to the next timer, i.e. wait for the fd sets or the next timer */ pth_time_set(&delay, &nexttimer_value); pth_time_sub(&delay, now); pdelay = &delay; } else { /* do a polling without a timeout, i.e. wait for the fd sets only with blocking */ pdelay = NULL; } /* clear pipe and let select() wait for the read-part of the pipe */ while (pth_sc(read)(pth_sigpipe[0], minibuf, sizeof(minibuf)) > 0) ; FD_SET(pth_sigpipe[0], &rfds); if (fdmax < pth_sigpipe[0]) fdmax = pth_sigpipe[0]; /* replace signal actions for signals we've to catch for events */ for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(&pth_sigcatch, sig)) { sa.sa_handler = pth_sched_eventmanager_sighandler; sigfillset(&sa.sa_mask); sa.sa_flags = 0; sigaction(sig, &sa, &osa[sig]); } } /* allow some signals to be delivered: Either to our catching handler or directly to the configured handler for signals not catched by events */ pth_sc(sigprocmask)(SIG_SETMASK, &pth_sigblock, &oss); /* now do the polling for filedescriptor I/O and timers WHEN THE SCHEDULER SLEEPS AT ALL, THEN HERE!! */ rc = -1; if (!(dopoll && fdmax == -1)) while ((rc = pth_sc(select)(fdmax+1, &rfds, &wfds, &efds, pdelay)) < 0 && errno == EINTR) ; /* restore signal mask and actions and handle signals */ pth_sc(sigprocmask)(SIG_SETMASK, &oss, NULL); for (sig = 1; sig < PTH_NSIG; sig++) if (sigismember(&pth_sigcatch, sig)) sigaction(sig, &osa[sig], NULL); /* if the timer elapsed, handle it */ if (!dopoll && rc == 0 && nexttimer_ev != NULL) { if (nexttimer_ev->ev_type == PTH_EVENT_FUNC) { /* it was an implicit timer event for a function event, so repeat the event handling for rechecking the function */ loop_repeat = TRUE; } else { /* it was an explicit timer event, standing for its own */ pth_debug2("pth_sched_eventmanager: [timeout] event occurred for thread \"%s\"", nexttimer_thread->name); nexttimer_ev->ev_status = PTH_STATUS_OCCURRED; } } /* if the internal signal pipe was used, adjust the select() results */ if (!dopoll && rc > 0 && FD_ISSET(pth_sigpipe[0], &rfds)) { FD_CLR(pth_sigpipe[0], &rfds); rc--; } /* if an error occurred, avoid confusion in the cleanup loop */ if (rc <= 0) { FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); } /* now comes the final cleanup loop where we've to do two jobs: first we've to do the late handling of the fd I/O events and additionally if a thread has one occurred event, we move it from the waiting queue to the ready queue */ /* for all threads in the waiting queue... */ t = pth_pqueue_head(&pth_WQ); while (t != NULL) { /* do the late handling of the fd I/O and signal events in the waiting event ring */ any_occurred = FALSE; if (t->events != NULL) { ev = evh = t->events; do { /* * Late handling for still not occured events */ if (ev->ev_status == PTH_STATUS_PENDING) { /* Filedescriptor I/O */ if (ev->ev_type == PTH_EVENT_FD) { if ( ( ev->ev_goal & PTH_UNTIL_FD_READABLE && FD_ISSET(ev->ev_args.FD.fd, &rfds)) || ( ev->ev_goal & PTH_UNTIL_FD_WRITEABLE && FD_ISSET(ev->ev_args.FD.fd, &wfds)) || ( ev->ev_goal & PTH_UNTIL_FD_EXCEPTION && FD_ISSET(ev->ev_args.FD.fd, &efds)) ) { pth_debug2("pth_sched_eventmanager: " "[I/O] event occurred for thread \"%s\"", t->name); ev->ev_status = PTH_STATUS_OCCURRED; } else if (rc < 0) { /* re-check particular filedescriptor */ int rc2; if (ev->ev_goal & PTH_UNTIL_FD_READABLE) FD_SET(ev->ev_args.FD.fd, &rfds); if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE) FD_SET(ev->ev_args.FD.fd, &wfds); if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION) FD_SET(ev->ev_args.FD.fd, &efds); pth_time_set(&delay, PTH_TIME_ZERO); while ((rc2 = pth_sc(select)(ev->ev_args.FD.fd+1, &rfds, &wfds, &efds, &delay)) < 0 && errno == EINTR) ; if (rc2 > 0) { /* cleanup afterwards for next iteration */ FD_CLR(ev->ev_args.FD.fd, &rfds); FD_CLR(ev->ev_args.FD.fd, &wfds); FD_CLR(ev->ev_args.FD.fd, &efds); } else if (rc2 < 0) { /* cleanup afterwards for next iteration */ FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); ev->ev_status = PTH_STATUS_FAILED; pth_debug2("pth_sched_eventmanager: " "[I/O] event failed for thread \"%s\"", t->name); } } } /* Filedescriptor Set I/O */ else if (ev->ev_type == PTH_EVENT_SELECT) { if (pth_util_fds_test(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds)) { n = pth_util_fds_select(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds); if (ev->ev_args.SELECT.n != NULL) *(ev->ev_args.SELECT.n) = n; ev->ev_status = PTH_STATUS_OCCURRED; pth_debug2("pth_sched_eventmanager: " "[I/O] event occurred for thread \"%s\"", t->name); } else if (rc < 0) { /* re-check particular filedescriptor set */ int rc2; fd_set *prfds = NULL; fd_set *pwfds = NULL; fd_set *pefds = NULL; fd_set trfds; fd_set twfds; fd_set tefds; if (ev->ev_args.SELECT.rfds) { memcpy(&trfds, ev->ev_args.SELECT.rfds, sizeof(rfds)); prfds = &trfds; } if (ev->ev_args.SELECT.wfds) { memcpy(&twfds, ev->ev_args.SELECT.wfds, sizeof(wfds)); pwfds = &twfds; } if (ev->ev_args.SELECT.efds) { memcpy(&tefds, ev->ev_args.SELECT.efds, sizeof(efds)); pefds = &tefds; } pth_time_set(&delay, PTH_TIME_ZERO); while ((rc2 = pth_sc(select)(ev->ev_args.SELECT.nfd+1, prfds, pwfds, pefds, &delay)) < 0 && errno == EINTR) ; if (rc2 < 0) { ev->ev_status = PTH_STATUS_FAILED; pth_debug2("pth_sched_eventmanager: " "[I/O] event failed for thread \"%s\"", t->name); } } } /* Signal Set */ else if (ev->ev_type == PTH_EVENT_SIGS) { for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(ev->ev_args.SIGS.sigs, sig)) { if (sigismember(&pth_sigraised, sig)) { if (ev->ev_args.SIGS.sig != NULL) *(ev->ev_args.SIGS.sig) = sig; pth_debug2("pth_sched_eventmanager: " "[signal] event occurred for thread \"%s\"", t->name); sigdelset(&pth_sigraised, sig); ev->ev_status = PTH_STATUS_OCCURRED; } } } } } /* * post-processing for already occured events */ else { /* Condition Variable Signal */ if (ev->ev_type == PTH_EVENT_COND) { /* clean signal */ if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_SIGNALED); ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_BROADCAST); ev->ev_args.COND.cond->cn_state &= ~(PTH_COND_HANDLED); } } } /* local to global mapping */ if (ev->ev_status != PTH_STATUS_PENDING) any_occurred = TRUE; } while ((ev = ev->ev_next) != evh); } /* cancellation support */ if (t->cancelreq == TRUE) { pth_debug2("pth_sched_eventmanager: cancellation request pending for thread \"%s\"", t->name); any_occurred = TRUE; } /* walk to next thread in waiting queue */ tlast = t; t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT); /* * move last thread to ready queue if any events occurred for it. * we insert it with a slightly increased queue priority to it a * better chance to immediately get scheduled, else the last running * thread might immediately get again the CPU which is usually not * what we want, because we oven use pth_yield() calls to give others * a chance. */ if (any_occurred) { pth_pqueue_delete(&pth_WQ, tlast); tlast->state = PTH_STATE_READY; pth_pqueue_insert(&pth_RQ, tlast->prio+1, tlast); pth_debug2("pth_sched_eventmanager: thread \"%s\" moved from waiting " "to ready queue", tlast->name); } } /* perhaps we have to internally loop... */ if (loop_repeat) { pth_time_set(now, PTH_TIME_NOW); goto loop_entry; } pth_debug1("pth_sched_eventmanager: leaving"); return;}intern void pth_sched_eventmanager_sighandler(int sig){ char c; /* remember raised signal */ sigaddset(&pth_sigraised, sig); /* write signal to signal pipe in order to awake the select() */ c = (int)sig; pth_sc(write)(pth_sigpipe[1], &c, sizeof(char)); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -