📄 su_port.c
字号:
/* XXX - we should free all resources associated with this */ indices[reverses[i]] = self->sup_free_index; self->sup_free_index = -2 - reverses[i]; continue; } indices[reverses[i]] = j; reverses[j] = reverses[i]; waits[j] = waits[i]; wait_cbs[j] = wait_cbs[i]; wait_args[j] = wait_args[i]; wait_roots[j] = wait_roots[i]; j++; } self->sup_n_waits = j; self->sup_registers++; return n_waits - j;}/**Set mask for a registered event. @internal * * The function su_port_eventmask() sets the mask describing events that can * signal the registered callback. * * @param port pointer to port object * @param index registration index * @param socket socket * @param events new event mask * * @retval 0 when successful, * @retval -1 upon an error. */int su_port_eventmask(su_port_t *self, int index, int socket, int events){ unsigned n; assert(self); assert(SU_PORT_OWN_THREAD(self)); if (index <= 0 || index > self->sup_size_waits) return -1; n = self->sup_indices[index - !SU_HAVE_MBOX]; if (n < 0) return -1; return su_wait_mask(&self->sup_waits[n], socket, events);}/** @internal * * Copies the su_wait_t objects from the port. The number of wait objects * can be found out by calling su_port_query() with @a n_waits as zero. * * @note This function is called only by friends. * * @param self - pointer to port object * @param waits - pointer to array to which wait objects are copied * @param n_waits - number of wait objects fitting in array waits * * @return Number of wait objects, or 0 upon an error. */unsigned su_port_query(su_port_t *self, su_wait_t *waits, unsigned n_waits){ unsigned n; assert(SU_PORT_OWN_THREAD(self)); n = self->sup_n_waits; if (n_waits != 0) { if (waits && n_waits >= n) memcpy(waits, self->sup_waits, n * sizeof(*waits)); else n = 0; } return n;}/** @internal Enable multishot mode. * * The function su_port_multishot() enables, disables or queries the * multishot mode for the port. The multishot mode determines how the events * are scheduled by port. If multishot mode is enabled, port serves all the * sockets that have received network events. If it is disables, only first * socket event is served. * * @param self pointer to port object * @param multishot multishot mode (0 => disables, 1 => enables, -1 => query) * * @retval 0 multishot mode is disabled * @retval 1 multishot mode is enabled * @retval -1 an error occurred */int su_port_multishot(su_port_t *self, int multishot){ if (multishot == -1) return self->sup_multishot; else if (multishot == 0 || multishot == 1) return self->sup_multishot = multishot; else return (errno = EINVAL), -1;}/** @internal Enable threadsafe operation. */staticint su_port_threadsafe(su_port_t *port){ return su_home_threadsafe(port->sup_home);}/** @internal Main loop. * * The function @c su_port_run() waits for wait objects and the timers * associated with the port object. When any wait object is signaled or * timer is expired, it invokes the callbacks, and returns waiting. * * The function @c su_port_run() runs until @c su_port_break() is called * from a callback. * * @param self pointer to port object * */void su_port_run(su_port_t *self){ int i; su_duration_t tout = 0; assert(SU_PORT_OWN_THREAD(self)); for (self->sup_running = 1; self->sup_running;) { tout = 2000; if (self->sup_prepoll) self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); if (self->sup_head) su_port_getmsgs(self); if (self->sup_timers) su_timer_expire(&self->sup_timers, &tout, su_now()); if (!self->sup_running) break; { su_wait_t *waits = self->sup_waits; unsigned n = self->sup_n_waits; unsigned version = self->sup_registers; if (self->sup_head) tout = 0; i = su_wait(waits, n, tout); if (i >= 0 && (unsigned)i < n) { su_root_t *root;#if HAVE_POLL if (self->sup_multishot) { for (; i < n; i++) { if (waits[i].revents) { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); /* Callback used su_register()/su_unregister() */ if (version != self->sup_registers) break; } } }#else /* !HAVE_POLL */ if (0) { }#endif else { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); } } } }}#if tuning/* This version can help tuning... */void su_port_run_tune(su_port_t *self){ int i; int timers = 0, messages = 0, events = 0; su_duration_t tout = 0, tout0; su_time_t started = su_now(), woken = started, bedtime = woken; assert(SU_PORT_OWN_THREAD(self)); for (self->sup_running = 1; self->sup_running;) { tout0 = tout, tout = 2000; timers = 0, messages = 0; if (self->sup_prepoll) self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); if (self->sup_head) messages = su_port_getmsgs(self); if (self->sup_timers) timers = su_timer_expire(&self->sup_timers, &tout, su_now()); if (messages || timers || events) SU_DEBUG_1(("su_port_run(%p): %.6f: %u messages %u timers %u" "events slept %.6f/%.3f\n", self, su_time_diff(woken, started), messages, timers, events, su_time_diff(woken, bedtime), tout0 * 1e-3)); if (!self->sup_running) break; { su_wait_t *waits = self->sup_waits; unsigned n = self->sup_n_waits; unsigned version = self->sup_registers; events = 0; if (self->sup_head) tout = 0; bedtime = su_now(); i = su_wait(waits, n, tout); woken = su_now(); if (i >= 0 && (unsigned)i < n) { su_root_t *root; if (self->sup_multishot) { for (; i < n; i++) { if (waits[i].revents) { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); events++; /* Callback used su_register()/su_unregister() */ if (version != self->sup_registers) break; } } } else { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); events++; } } } }}#endif/** @internal * The function @c su_port_break() is used to terminate execution of @c * su_port_run(). It can be called from a callback function. * * @param self pointer to port * */void su_port_break(su_port_t *self){ self->sup_running = 0; }/** @internal Block until wait object is signaled or timeout. * * This function waits for wait objects and the timers associated with * the root object. When any wait object is signaled or timer is * expired, it invokes the callbacks. * * This function returns when a callback has been invoked or @c tout * milliseconds is elapsed. * * @param self pointer to port * @param tout timeout in milliseconds * * @Return * Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if * there are no active timers. */su_duration_t su_port_step(su_port_t *self, su_duration_t tout){ int i; int timers = 0, messages = 0, events = 0; su_time_t now = su_now(); assert(SU_PORT_OWN_THREAD(self)); if (self->sup_prepoll) self->sup_prepoll(self->sup_pp_magic, self->sup_pp_root); if (self->sup_head) messages = su_port_getmsgs(self); if (self->sup_timers) timers = su_timer_expire(&self->sup_timers, &tout, now); if (self->sup_head) tout = 0; { su_wait_t *waits = self->sup_waits; unsigned n = self->sup_n_waits; unsigned version = self->sup_registers; su_root_t *root; i = su_wait(waits, n, tout); tout = SU_WAIT_FOREVER; if (i >= 0 && (unsigned)i < n) { tout = 0;#if HAVE_POLL if (self->sup_multishot) { for (; i < n; i++) { if (waits[i].revents) { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &waits[i], self->sup_wait_args[i]); events++; /* Callback function used su_register()/su_unregister() */ if (version != self->sup_registers) break; } } } #else /* !HAVE_POLL */ if (0) { }#endif else { root = self->sup_wait_roots[i]; self->sup_wait_cbs[i](root ? su_root_magic(root) : NULL, &self->sup_waits[i], self->sup_wait_args[i]); events++; } } } if (self->sup_head) messages = su_port_getmsgs(self); if (self->sup_timers) timers += su_timer_expire(&self->sup_timers, &tout, su_now()); if (self->sup_head) tout = 0; return tout;}/** @internal * Checks if the calling thread owns the port object. * * @param self pointer to a port object * * @retval true (nonzero) if the calling thread owns the port, * @retval false (zero) otherwise. */int su_port_own_thread(su_port_t const *self){ return self == NULL || SU_PORT_OWN_THREAD(self);}#if 0/** @internal * Prints out the contents of the port. * * @param self pointer to a port * @param f pointer to a file (if @c NULL, uses @c stdout). */void su_port_dump(su_port_t const *self, FILE *f){ int i;#define IS_WAIT_IN(x) (((x)->events & SU_WAIT_IN) ? "IN" : "")#define IS_WAIT_OUT(x) (((x)->events & SU_WAIT_OUT) ? "OUT" : "")#define IS_WAIT_ACCEPT(x) (((x)->events & SU_WAIT_ACCEPT) ? "ACCEPT" : "") if (f == NULL) f = stdout; fprintf(f, "su_port_t at %p:\n", self); fprintf(f, "\tport is%s running\n", self->sup_running ? "" : "not ");#if SU_HAVE_PTHREADS fprintf(f, "\tport tid %p\n", (void *)self->sup_tid);#endif#if SU_HAVE_MBOX fprintf(f, "\tport mbox %d (%s%s%s)\n", self->sup_mbox[0], IS_WAIT_IN(&self->sup_mbox_wait), IS_WAIT_OUT(&self->sup_mbox_wait), IS_WAIT_ACCEPT(&self->sup_mbox_wait));#endif fprintf(f, "\t%d wait objects\n", self->sup_n_waits); for (i = 0; i < self->sup_n_waits; i++) { }}#endif/* ========================================================================= * Pre-poll() callback */int su_port_add_prepoll(su_port_t *port, su_root_t *root, su_prepoll_f *callback, su_prepoll_magic_t *magic){ if (port->sup_prepoll) return -1; port->sup_prepoll = callback; port->sup_pp_magic = magic; port->sup_pp_root = root; return 0;}int su_port_remove_prepoll(su_port_t *port, su_root_t *root){ if (port->sup_pp_root != root) return -1; port->sup_prepoll = NULL; port->sup_pp_magic = NULL; port->sup_pp_root = NULL; return 0;}/* ========================================================================= * Timers */staticsu_timer_t **su_port_timers(su_port_t *self){ return &self->sup_timers;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -