📄 pth_sched.c
字号:
sigdelset(&pth_current->sigpending, sig); pth_current->sigpendcnt--; } else if (!sigismember(&pth_sigpending, sig)) { /* thread signal not delivered */ pth_util_sigdelete(sig); } } } } /* * Check for stack overflow */ if (pth_current->stackguard != NULL) { if (*pth_current->stackguard != 0xDEAD) { pth_debug3("pth_scheduler: stack overflow detected for thread 0x%lx (\"%s\")", (unsigned long)pth_current, pth_current->name); /* * if the application doesn't catch SIGSEGVs, we terminate * manually with a SIGSEGV now, but output a reasonable message. */ if (sigaction(SIGSEGV, NULL, &sa) == 0) { if (sa.sa_handler == SIG_DFL) { fprintf(stderr, "**Pth** STACK OVERFLOW: thread pid_t=0x%lx, name=\"%s\"\n", (unsigned long)pth_current, pth_current->name); kill(getpid(), SIGSEGV); sigfillset(&ss); sigdelset(&ss, SIGSEGV); sigsuspend(&ss); abort(); } } /* * else we terminate the thread only and send us a SIGSEGV * which allows the application to handle the situation... */ pth_current->join_arg = (void *)0xDEAD; pth_current->state = PTH_STATE_DEAD; kill(getpid(), SIGSEGV); } } /* * If previous thread is now marked as dead, kick it out */ if (pth_current->state == PTH_STATE_DEAD) { pth_debug2("pth_scheduler: marking thread \"%s\" as dead", pth_current->name); if (!pth_current->joinable) pth_tcb_free(pth_current); else pth_pqueue_insert(&pth_DQ, PTH_PRIO_STD, pth_current); pth_current = NULL; } /* * If thread wants to wait for an event * move it to waiting queue now */ if (pth_current != NULL && pth_current->state == PTH_STATE_WAITING) { pth_debug2("pth_scheduler: moving thread \"%s\" to waiting queue", pth_current->name); pth_pqueue_insert(&pth_WQ, pth_current->prio, pth_current); pth_current = NULL; } /* * migrate old treads in ready queue into higher * priorities to avoid starvation and insert last running * thread back into this queue, too. */ pth_pqueue_increase(&pth_RQ); if (pth_current != NULL) pth_pqueue_insert(&pth_RQ, pth_current->prio, pth_current); /* * Manage the events in the waiting queue, i.e. decide whether their * events occurred and move them to the ready queue. But wait only if * we have already no new or ready threads. */ if ( pth_pqueue_elements(&pth_RQ) == 0 && pth_pqueue_elements(&pth_NQ) == 0) /* still no NEW or READY threads, so we have to wait for new work */ pth_sched_eventmanager(&snapshot, FALSE /* wait */); else /* already NEW or READY threads exists, so just poll for even more work */ pth_sched_eventmanager(&snapshot, TRUE /* poll */); } /* NOTREACHED */ return NULL;}/* * Look whether some events already occurred (or failed) and move * corresponding threads from waiting queue back to ready queue. */intern void pth_sched_eventmanager(pth_time_t *now, int dopoll){ pth_t nexttimer_thread; pth_event_t nexttimer_ev; pth_time_t nexttimer_value; pth_event_t evh; pth_event_t ev; pth_t t; pth_t tlast; int this_occurred; int any_occurred; fd_set rfds; fd_set wfds; fd_set efds; struct timeval delay; struct timeval *pdelay; sigset_t oss; struct sigaction sa; struct sigaction osa[1+PTH_NSIG]; char minibuf[128]; int loop_repeat; int fdmax; int rc; int sig; int n; pth_debug2("pth_sched_eventmanager: enter in %s mode", dopoll ? "polling" : "waiting"); /* entry point for internal looping in event handling */ loop_entry: loop_repeat = FALSE; /* initialize fd sets */ FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); fdmax = -1; /* initialize signal status */ sigpending(&pth_sigpending); sigfillset(&pth_sigblock); sigemptyset(&pth_sigcatch); sigemptyset(&pth_sigraised); /* initialize next timer */ pth_time_set(&nexttimer_value, PTH_TIME_ZERO); nexttimer_thread = NULL; nexttimer_ev = NULL; /* for all threads in the waiting queue... */ any_occurred = FALSE; for (t = pth_pqueue_head(&pth_WQ); t != NULL; t = pth_pqueue_walk(&pth_WQ, t, PTH_WALK_NEXT)) { /* determine signals we block */ for (sig = 1; sig < PTH_NSIG; sig++) if (!sigismember(&(t->mctx.sigs), sig)) sigdelset(&pth_sigblock, sig); /* cancellation support */ if (t->cancelreq == TRUE) any_occurred = TRUE; /* ... and all their events... */ if (t->events == NULL) continue; /* ...check whether events occurred */ ev = evh = t->events; do { if (ev->ev_status == PTH_STATUS_PENDING) { this_occurred = FALSE; /* Filedescriptor I/O */ if (ev->ev_type == PTH_EVENT_FD) { /* filedescriptors are checked later all at once. Here we only assemble them in the fd sets */ if (ev->ev_goal & PTH_UNTIL_FD_READABLE) FD_SET(ev->ev_args.FD.fd, &rfds); if (ev->ev_goal & PTH_UNTIL_FD_WRITEABLE) FD_SET(ev->ev_args.FD.fd, &wfds); if (ev->ev_goal & PTH_UNTIL_FD_EXCEPTION) FD_SET(ev->ev_args.FD.fd, &efds); if (fdmax < ev->ev_args.FD.fd) fdmax = ev->ev_args.FD.fd; } /* Filedescriptor Set Select I/O */ else if (ev->ev_type == PTH_EVENT_SELECT) { /* filedescriptors are checked later all at once. Here we only merge the fd sets. */ pth_util_fds_merge(ev->ev_args.SELECT.nfd, ev->ev_args.SELECT.rfds, &rfds, ev->ev_args.SELECT.wfds, &wfds, ev->ev_args.SELECT.efds, &efds); if (fdmax < ev->ev_args.SELECT.nfd-1) fdmax = ev->ev_args.SELECT.nfd-1; } /* Signal Set */ else if (ev->ev_type == PTH_EVENT_SIGS) { for (sig = 1; sig < PTH_NSIG; sig++) { if (sigismember(ev->ev_args.SIGS.sigs, sig)) { /* thread signal handling */ if (sigismember(&t->sigpending, sig)) { *(ev->ev_args.SIGS.sig) = sig; sigdelset(&t->sigpending, sig); t->sigpendcnt--; this_occurred = TRUE; } /* process signal handling */ if (sigismember(&pth_sigpending, sig)) { if (ev->ev_args.SIGS.sig != NULL) *(ev->ev_args.SIGS.sig) = sig; pth_util_sigdelete(sig); sigdelset(&pth_sigpending, sig); this_occurred = TRUE; } else { sigdelset(&pth_sigblock, sig); sigaddset(&pth_sigcatch, sig); } } } } /* Timer */ else if (ev->ev_type == PTH_EVENT_TIME) { if (pth_time_cmp(&(ev->ev_args.TIME.tv), now) < 0) this_occurred = TRUE; else { /* remember the timer which will be elapsed next */ if ((nexttimer_thread == NULL && nexttimer_ev == NULL) || pth_time_cmp(&(ev->ev_args.TIME.tv), &nexttimer_value) < 0) { nexttimer_thread = t; nexttimer_ev = ev; pth_time_set(&nexttimer_value, &(ev->ev_args.TIME.tv)); } } } /* Message Port Arrivals */ else if (ev->ev_type == PTH_EVENT_MSG) { if (pth_ring_elements(&(ev->ev_args.MSG.mp->mp_queue)) > 0) this_occurred = TRUE; } /* Mutex Release */ else if (ev->ev_type == PTH_EVENT_MUTEX) { if (!(ev->ev_args.MUTEX.mutex->mx_state & PTH_MUTEX_LOCKED)) this_occurred = TRUE; } /* Condition Variable Signal */ else if (ev->ev_type == PTH_EVENT_COND) { if (ev->ev_args.COND.cond->cn_state & PTH_COND_SIGNALED) { if (ev->ev_args.COND.cond->cn_state & PTH_COND_BROADCAST) this_occurred = TRUE; else { if (!(ev->ev_args.COND.cond->cn_state & PTH_COND_HANDLED)) { ev->ev_args.COND.cond->cn_state |= PTH_COND_HANDLED; this_occurred = TRUE; } } } } /* Thread Termination */ else if (ev->ev_type == PTH_EVENT_TID) { if ( ( ev->ev_args.TID.tid == NULL && pth_pqueue_elements(&pth_DQ) > 0) || ( ev->ev_args.TID.tid != NULL && ev->ev_args.TID.tid->state == ev->ev_goal)) this_occurred = TRUE; } /* Custom Event Function */ else if (ev->ev_type == PTH_EVENT_FUNC) { if (ev->ev_args.FUNC.func(ev->ev_args.FUNC.arg)) this_occurred = TRUE; else { pth_time_t tv; pth_time_set(&tv, now); pth_time_add(&tv, &(ev->ev_args.FUNC.tv)); if ((nexttimer_thread == NULL && nexttimer_ev == NULL) || pth_time_cmp(&tv, &nexttimer_value) < 0) { nexttimer_thread = t; nexttimer_ev = ev; pth_time_set(&nexttimer_value, &tv); } } } /* tag event if it has occurred */ if (this_occurred) { pth_debug2("pth_sched_eventmanager: [non-I/O] event occurred for thread \"%s\"", t->name);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -