📄 su_source.c
字号:
n = self->sup_n_waits; if (n >= self->sup_size_waits) { /* Reallocate size arrays */ int size; unsigned *indices; su_wait_t *waits; su_wakeup_f *wait_cbs; su_wakeup_arg_t **wait_args; su_root_t **wait_tasks; if (self->sup_size_waits == 0) size = SU_MIN_WAITS; else size = 2 * self->sup_size_waits; indices = realloc(self->sup_indices, size * sizeof(*indices)); if (indices) { self->sup_indices = indices; for (i = self->sup_size_waits; i < size; i++) indices[i] = UINT_MAX; } for (i = 0; i < self->sup_n_waits; i++) g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[i]); waits = realloc(self->sup_waits, size * sizeof(*waits)); if (waits) self->sup_waits = waits; for (i = 0; i < self->sup_n_waits; i++) g_source_add_poll(self->sup_source, (GPollFD*)&waits[i]); wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs)); if (wait_cbs) self->sup_wait_cbs = wait_cbs; wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args)); if (wait_args) self->sup_wait_args = wait_args; /* Add sup_wait_roots array, if needed */ wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks)); if (wait_tasks) self->sup_wait_roots = wait_tasks; if (!(indices && waits && wait_cbs && wait_args && wait_tasks)) { return -1; } self->sup_size_waits = size; } self->sup_n_waits++; if (priority > 0) { /* Insert */ for (; n > 0; n--) { g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n-1]); self->sup_waits[n] = self->sup_waits[n-1]; g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1]; self->sup_wait_args[n] = self->sup_wait_args[n-1]; self->sup_wait_roots[n] = self->sup_wait_roots[n-1]; } } else { /* Append - no need to move anything */ } self->sup_waits[n] = *wait; g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); self->sup_wait_cbs[n] = callback; self->sup_wait_args[n] = arg; self->sup_wait_roots[n] = root; I = self->sup_max_index; for (i = 0; i < I; i++) if (self->sup_indices[i] == UINT_MAX) break; else if (self->sup_indices[i] >= n) self->sup_indices[i]++; if (i == I) self->sup_max_index++; if (n + 1 < self->sup_n_waits) for (j = i; j < I; j++) if (self->sup_indices[j] != UINT_MAX && self->sup_indices[j] >= n) self->sup_indices[j]++; self->sup_indices[i] = n; self->sup_registers++; return i + 1; /* 0 is failure */}/** Unregister a su_wait_t object. * * The function su_source_unregister() unregisters a su_wait_t object. The * wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param root - pointer to root object * @param wait - pointer to wait object * @param callback - callback function pointer (may be NULL) * @param arg - argument given to callback function when it is invoked * (may be NULL) * * @return Nonzero index of the wait object, or -1 upon an error. */int su_source_unregister(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, /* XXX - ignored */ su_wakeup_arg_t *arg){ unsigned n, N; unsigned i, I, j, *indices; enter; assert(self); assert(SU_SOURCE_OWN_THREAD(self)); i = (unsigned)-1; N = self->sup_n_waits; I = self->sup_max_index; indices = self->sup_indices; for (n = 0; n < N; n++) { if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) != 0) continue; /* Found - delete it */ if (indices[n] == n) i = n; else for (i = 0; i < I; i++) if (indices[i] == n) break; assert(i < I); indices[i] = UINT_MAX; g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); self->sup_n_waits = N = N - 1; if (n < N) for (j = 0; j < I; j++) if (self->sup_indices[j] != UINT_MAX && self->sup_indices[j] > n) self->sup_indices[j]--; for (; n < N; n++) { g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n+1]); self->sup_waits[n] = self->sup_waits[n+1]; g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); self->sup_wait_cbs[n] = self->sup_wait_cbs[n+1]; self->sup_wait_args[n] = self->sup_wait_args[n+1]; self->sup_wait_roots[n] = self->sup_wait_roots[n+1]; } i += 1; /* 0 is failure */ if (i == I) self->sup_max_index--; break; } self->sup_registers++; return (int)i;}/** Deregister a su_wait_t object. * * The function su_source_deregister() deregisters a su_wait_t registrattion. * The wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param i - registration index * * @return Index of the wait object, or -1 upon an error. */int su_source_deregister(su_port_t *self, int i){ unsigned j, n, N; unsigned I, *indices; su_wait_t wait[1]; enter; assert(self); assert(SU_SOURCE_OWN_THREAD(self)); if (i <= 0) return -1; N = self->sup_n_waits; I = self->sup_max_index; indices = self->sup_indices; assert(i < I + 1); n = indices[i - 1]; if (n == UINT_MAX) return -1; self->sup_n_waits = N = N - 1; wait[0] = self->sup_waits[n]; g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); if (n < N) for (j = 0; j < I; j++) if (self->sup_indices[j] != UINT_MAX && self->sup_indices[j] > n) self->sup_indices[j]--; for (; n < N; n++) { g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n + 1]); self->sup_waits[n] = self->sup_waits[n+1]; g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); self->sup_wait_cbs[n] = self->sup_wait_cbs[n+1]; self->sup_wait_args[n] = self->sup_wait_args[n+1]; self->sup_wait_roots[n] = self->sup_wait_roots[n+1]; } indices[i - 1] = UINT_MAX; if (i == I) self->sup_max_index--; su_wait_destroy(wait); self->sup_registers++; return (int)i;}/** @internal * Unregister all su_wait_t objects. * * The function su_source_unregister_all() unregisters all su_wait_t objects * associated with given root object destroys all queued timers. * * @param self - pointer to port object * @param root - pointer to root object * * @return Number of wait objects removed. */int su_source_unregister_all(su_port_t *self, su_root_t *root){ unsigned i, j; unsigned n_waits; su_wait_t *waits; su_wakeup_f *wait_cbs; su_wakeup_arg_t**wait_args; su_root_t **wait_roots; enter; assert(SU_SOURCE_OWN_THREAD(self)); n_waits = self->sup_n_waits; waits = self->sup_waits; wait_cbs = self->sup_wait_cbs; wait_args = self->sup_wait_args; wait_roots = self->sup_wait_roots; for (i = j = 0; (unsigned)i < n_waits; i++) { if (wait_roots[i] == root) { /* XXX - we should free all resources associated with this */ g_source_remove_poll(self->sup_source, (GPollFD*)&waits[i]); continue; } if (i != j) { g_source_remove_poll(self->sup_source, (GPollFD*)&waits[i]); waits[j] = waits[i]; wait_cbs[j] = wait_cbs[i]; wait_args[j] = wait_args[i]; wait_roots[j] = wait_roots[i]; g_source_add_poll(self->sup_source, (GPollFD*)&waits[i]); } j++; } self->sup_n_waits = j; self->sup_registers++; return n_waits - j;}/**Set mask for a registered event. @internal * * The function su_source_eventmask() sets the mask describing events that can * signal the registered callback. * * @param port pointer to port object * @param index registration index * @param socket socket * @param events new event mask * * @retval 0 when successful, * @retval -1 upon an error. */int su_source_eventmask(su_port_t *self, int index, int socket, int events){ unsigned n; int retval; enter; assert(self); assert(SU_SOURCE_OWN_THREAD(self)); assert(index <= self->sup_max_index); if (index <= 0 || index > self->sup_max_index) return -1; n = self->sup_indices[index - 1]; if (n == UINT_MAX) return -1; g_source_remove_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); retval = su_wait_mask(&self->sup_waits[n], socket, events); g_source_add_poll(self->sup_source, (GPollFD*)&self->sup_waits[n]); return retval;}staticint su_source_multishot(su_port_t *self, int multishot){ if (multishot == -1) return 1; else if (multishot == 0 || multishot == 1) return 1; /* Always enabled */ else return (errno = EINVAL), -1;}/** @internal Enable threadsafe operation. */staticint su_source_threadsafe(su_port_t *port){ return su_home_threadsafe(port->sup_home);}/** @internal Main loop. * * The function @c su_source_run() runs the main loop * * The function @c su_source_run() runs until @c su_source_break() is called * from a callback. * * @param self pointer to root object * */void su_source_run(su_port_t *self){ GMainContext *gmc; GMainLoop *gml; enter; gmc = g_source_get_context(self->sup_source); if (gmc && g_main_context_acquire(gmc)) { gml = g_main_loop_new(gmc, TRUE); self->sup_main_loop = gml; g_main_loop_run(gml); g_main_loop_unref(gml); self->sup_main_loop = NULL; g_main_context_release(gmc); }}/** @internal * The function @c su_source_break() is used to terminate execution of @c * su_source_run(). It can be called from a callback function. * * @param self pointer to port * */void su_source_break(su_port_t *self){ enter; if (self->sup_main_loop) g_main_loop_quit(self->sup_main_loop);}/** @internal Block until wait object is signaled or timeout. * * This function waits for wait objects and the timers associated with * the root object. When any wait object is signaled or timer is * expired, it invokes the callbacks. * * This function returns when a callback has been invoked or @c tout * milliseconds is elapsed. * * @param self pointer to port * @param tout timeout in milliseconds * * @Return * Milliseconds to the next invocation of timer, or @c SU_WAIT_FOREVER if * there are no active timers. */su_duration_t su_source_step(su_port_t *self, su_duration_t tout){ GMainContext *gmc; enter; gmc = g_source_get_context(self->sup_source); if (gmc && g_main_context_acquire(gmc)) { gint priority = G_MAXINT; if (g_main_context_prepare(gmc, &priority)) { g_main_context_dispatch(gmc); } else { gint timeout = tout > G_MAXINT ? G_MAXINT : tout; gint i, n = 0; GPollFD *fds = NULL; priority = G_MAXINT; n = g_main_context_query(gmc, priority, &timeout, fds, n); if (n > 0) { fds = g_alloca(n * (sizeof *fds)); n = g_main_context_query(gmc, priority, &timeout, fds, n); } if (tout < timeout) timeout = tout; i = su_wait((su_wait_t *)fds, n, timeout); if (g_main_context_check(gmc, priority, fds, n)) g_main_context_dispatch(gmc); } g_main_context_release(gmc); } return 0;}/** @internal * Checks if the calling thread owns the port object. * * @param self pointer to a port object * * @retval true (nonzero) if the calling thread owns the port, * @retval false (zero) otherwise. */int su_source_own_thread(su_port_t const *self){ return self == NULL || SU_SOURCE_OWN_THREAD(self);}#if 0/** @internal * Prints out the contents of the port. * * @param self pointer to a port * @param f pointer to a file (if @c NULL, uses @c stdout). */void su_source_dump(su_port_t const *self, FILE *f){ int i;#define IS_WAIT_IN(x) (((x)->events & SU_WAIT_IN) ? "IN" : "")#define IS_WAIT_OUT(x) (((x)->events & SU_WAIT_OUT) ? "OUT" : "")#define IS_WAIT_ACCEPT(x) (((x)->events & SU_WAIT_ACCEPT) ? "ACCEPT" : "") if (f == NULL) f = stdout; fprintf(f, "su_port_t at %p:\n", self); fprintf(f, "\tport is%s running\n", self->sup_running ? "" : "not "); fprintf(f, "\tport tid %p\n", (void *)self->sup_tid); fprintf(f, "\t%d wait objects\n", self->sup_n_waits); for (i = 0; i < self->sup_n_waits; i++) { }}#endif/* ========================================================================= * Pre-poll() callback */int su_source_add_prepoll(su_port_t *port, su_root_t *root, su_prepoll_f *callback, su_prepoll_magic_t *magic){#if 0 if (port->sup_prepoll) return -1; port->sup_prepoll = callback; port->sup_pp_magic = magic; port->sup_pp_root = root; return 0;#else return -1;#endif}int su_source_remove_prepoll(su_port_t *port, su_root_t *root){#if 0 if (port->sup_pp_root != root) return -1; port->sup_prepoll = NULL; port->sup_pp_magic = NULL; port->sup_pp_root = NULL; return 0;#else return -1;#endif}/* ========================================================================= * Timers */staticsu_timer_t **su_source_timers(su_port_t *self){ return &self->sup_timers;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -