📄 su_port.c
字号:
/** @internal Destroy a port. */void su_port_destroy(su_port_t *self){ assert(self); SU_DEBUG_9(("su_port_destroy() called\n"));#if SU_HAVE_MBOX if (self->sup_mbox[0] != INVALID_SOCKET) { su_port_unregister(self, NULL, &self->sup_mbox_wait, NULL, (su_wakeup_arg_t *)self->sup_mbox); su_wait_destroy(&self->sup_mbox_wait); su_close(self->sup_mbox[0]); self->sup_mbox[0] = INVALID_SOCKET;#if HAVE_SOCKETPAIR su_close(self->sup_mbox[1]); self->sup_mbox[1] = INVALID_SOCKET;#endif SU_DEBUG_9(("su_port_destroy() close mailbox\n")); }#endif if (self->sup_waits) free(self->sup_waits), self->sup_waits = NULL; if (self->sup_wait_cbs) free(self->sup_wait_cbs), self->sup_wait_cbs = NULL; if (self->sup_wait_args) free(self->sup_wait_args), self->sup_wait_args = NULL; if (self->sup_wait_roots) free(self->sup_wait_roots), self->sup_wait_roots = NULL; if (self->sup_reverses) free(self->sup_reverses), self->sup_reverses = NULL; if (self->sup_indices) free(self->sup_indices), self->sup_indices = NULL; SU_DEBUG_9(("su_port_destroy() freed registrations\n")); su_home_zap(self->sup_home); SU_DEBUG_9(("su_port_destroy() returns\n"));}static void su_port_lock(su_port_t *self, char const *who){ SU_PORT_LOCK(self, who);}static void su_port_unlock(su_port_t *self, char const *who){ SU_PORT_UNLOCK(self, who);}static void su_port_incref(su_port_t *self, char const *who){ SU_PORT_INCREF(self, who);}static void su_port_decref(su_port_t *self, int blocking, char const *who){ if (blocking) SU_PORT_ZAPREF(self, who); else SU_PORT_DECREF(self, who);}static struct _GSource *su_port_gsource(su_port_t *self){ return NULL;}#if SU_HAVE_MBOX/** @internal Message box wakeup function. */static int su_port_wakeup(su_root_magic_t *magic, /* NULL */ su_wait_t *w, su_wakeup_arg_t *arg){ char buf[32]; su_socket_t s = *(su_socket_t *)arg; su_wait_events(w, s); recv(s, buf, sizeof(buf), 0); return 0;}#endif/** @internal Send a message to the port. */int su_port_send(su_port_t *self, su_msg_r rmsg){ if (self) { int wakeup; SU_PORT_LOCK(self, "su_port_send"); wakeup = self->sup_head == NULL; *self->sup_tail = rmsg[0]; rmsg[0] = NULL; self->sup_tail = &(*self->sup_tail)->sum_next;#if SU_HAVE_MBOX /* if (!pthread_equal(pthread_self(), self->sup_tid)) */ if (wakeup) { assert(self->sup_mbox[MBOX_SEND] != INVALID_SOCKET); if (send(self->sup_mbox[MBOX_SEND], "X", 1, 0) == -1) {#if HAVE_SOCKETPAIR if (su_errno() != EWOULDBLOCK)#endif su_perror("su_msg_send: send()"); } }#endif SU_PORT_UNLOCK(self, "su_port_send"); return 0; } else { su_msg_destroy(rmsg); return -1; }}/** @internal * Execute the messages in the incoming queue until the queue is empty.. * * @param self - pointer to a port object * * @retval Number of messages sent */int su_port_getmsgs(su_port_t *self){ int n = 0; if (self->sup_head) { su_msg_f f; su_msg_t *msg, *queue; SU_PORT_LOCK(self, "su_port_getmsgs"); queue = self->sup_head; self->sup_tail = &self->sup_head; self->sup_head = NULL; SU_PORT_UNLOCK(self, "su_port_getmsgs"); for (msg = queue; msg; msg = queue) { queue = msg->sum_next; msg->sum_next = NULL; f = msg->sum_func; if (f) f(SU_ROOT_MAGIC(msg->sum_to->sut_root), &msg, msg->sum_data); if (msg && msg->sum_report) su_msg_delivery_report(&msg); else su_msg_destroy(&msg); n++; } /* Check for wait events that may have been generated by this message */ su_port_wait_events(self, 0); } return n;}/** @internal * * Register a @c su_wait_t object. The wait object, a callback function and * a argument pointer is stored in the port object. The callback function * will be called when the wait object is signaled. * * Please note if identical wait objects are inserted, only first one is * ever signalled. * * @param self pointer to port * @param root pointer to root object * @param waits pointer to wait object * @param callback callback function pointer * @param arg argument given to callback function when it is invoked * @param priority relative priority of the wait object * (0 is normal, 1 important, 2 realtime) * * @return * The function @su_port_register returns nonzero index of the wait object, * or -1 upon an error. */int su_port_register(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority){ int i, j, n; assert(SU_PORT_OWN_THREAD(self)); n = self->sup_n_waits; if (n >= self->sup_size_waits) { /* Reallocate size arrays */ int size; int *indices; int *reverses; su_wait_t *waits; su_wakeup_f *wait_cbs; su_wakeup_arg_t **wait_args; su_root_t **wait_tasks; assert(self->sup_free_index == -1); if (self->sup_size_waits == 0) size = su_root_size_hint; else size = 2 * self->sup_size_waits; if (size < SU_MIN_WAITS) size = SU_MIN_WAITS; /* Too large */ if (-3 - size > 0) return (errno = ENOMEM), -1; indices = realloc(self->sup_indices, size * sizeof(*indices)); if (indices) { self->sup_indices = indices; for (i = self->sup_size_waits; i < size - 1; i++) indices[i] = -3 - i; if (self->sup_size_waits < size) { indices[i] = -1; self->sup_free_index = -2 - self->sup_size_waits; } } reverses = realloc(self->sup_reverses, size * sizeof(*waits)); if (reverses) { for (i = self->sup_size_waits; i < size; i++) reverses[i] = -1; self->sup_reverses = reverses; } waits = realloc(self->sup_waits, size * sizeof(*waits)); if (waits) self->sup_waits = waits; wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs)); if (wait_cbs) self->sup_wait_cbs = wait_cbs; wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args)); if (wait_args) self->sup_wait_args = wait_args; /* Add sup_wait_roots array, if needed */ wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks)); if (wait_tasks) self->sup_wait_roots = wait_tasks; if (!(indices && reverses && waits && wait_cbs && wait_args && wait_tasks)) { return -1; } self->sup_size_waits = size; } self->sup_n_waits++; if (priority > 0) { /* Insert */ for (; n > 0; n--) { self->sup_reverses[n] = self->sup_reverses[n-1]; self->sup_waits[n] = self->sup_waits[n-1]; self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1]; self->sup_wait_args[n] = self->sup_wait_args[n-1]; self->sup_wait_roots[n] = self->sup_wait_roots[n-1]; } self->sup_pri_offset++; } else { /* Append - no need to move anything */ } i = -2 - self->sup_free_index; assert(i < self->sup_size_waits); self->sup_free_index = self->sup_indices[i]; self->sup_reverses[n] = i; self->sup_waits[n] = *wait; self->sup_wait_cbs[n] = callback; self->sup_wait_args[n] = arg; self->sup_wait_roots[n] = root; if (n == 0 && self->sup_n_waits > 1) { for (j = 0; j < self->sup_size_waits; j++) { if (self->sup_indices[j] >= 0) self->sup_indices[j]++; } } self->sup_indices[i] = n; self->sup_registers++; return i + !SU_HAVE_MBOX; /* Mailbox has index 0 */}/** Deregister a su_wait_t object. */staticint su_port_deregister0(su_port_t *self, int i, su_wait_t wait[1]){ int n, j, N, *indices, *reverses; i -= !SU_HAVE_MBOX; N = self->sup_n_waits; indices = self->sup_indices; reverses = self->sup_reverses; n = indices[i]; if (n < 0) return -1; assert(i == self->sup_reverses[n]); self->sup_n_waits = N = N - 1; wait[0] = self->sup_waits[n]; if (N > 0) { if (n < self->sup_pri_offset) { j = self->sup_pri_offset - 1; if (n != j) { assert(reverses[j] >= 0); indices[reverses[j]] = n; self->sup_reverses[n] = self->sup_reverses[j]; self->sup_waits[n] = self->sup_waits[j]; self->sup_wait_cbs[n] = self->sup_wait_cbs[j]; self->sup_wait_args[n] = self->sup_wait_args[j]; self->sup_wait_roots[n] = self->sup_wait_roots[j]; n = j; } self->sup_pri_offset = j; } if (n < N) { assert(reverses[N] >= 0); indices[reverses[N]] = n; self->sup_reverses[n] = self->sup_reverses[N]; self->sup_waits[n] = self->sup_waits[N]; self->sup_wait_cbs[n] = self->sup_wait_cbs[N]; self->sup_wait_args[n] = self->sup_wait_args[N]; self->sup_wait_roots[n] = self->sup_wait_roots[N]; } } indices[i] = self->sup_free_index; self->sup_free_index = -2 - i; self->sup_registers++; return (int)i + !SU_HAVE_MBOX;}/** Unregister a su_wait_t object. * * The function su_port_unregister() unregisters a su_wait_t object. The * wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param root - pointer to root object * @param wait - pointer to wait object * @param callback - callback function pointer (may be NULL) * @param arg - argument given to callback function when it is invoked * (may be NULL) * * @return Nonzero index of the wait object, or -1 upon an error. */int su_port_unregister(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, /* XXX - ignored */ su_wakeup_arg_t *arg){ int n, N; int i, *indices; su_wait_t dummy[1]; assert(self); assert(SU_PORT_OWN_THREAD(self)); i = (unsigned)-1; N = self->sup_n_waits; indices = self->sup_indices; for (n = 0; n < N; n++) { if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) == 0) { /* Found - delete it */ if (indices[n] == n) i = n; else for (i = 0; i < self->sup_size_waits; i++) if (indices[i] == n) break; return su_port_deregister0(self, i, dummy); } } su_seterrno(ENOENT); return -1;}/** Deregister a su_wait_t object. * * The function su_port_deregister() deregisters a su_wait_t registrattion. * The wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param i - registration index * * @return Index of the wait object, or -1 upon an error. */int su_port_deregister(su_port_t *self, int i){ su_wait_t wait[1] = { SU_WAIT_INIT }; int retval; assert(self); assert(SU_PORT_OWN_THREAD(self)); if (i <= 0 || i - !SU_HAVE_MBOX >= self->sup_size_waits) return -1; assert(i - !SU_HAVE_MBOX < self->sup_size_waits); retval = su_port_deregister0(self, i, wait); su_wait_destroy(wait); return retval;}/** @internal * Unregister all su_wait_t objects. * * The function su_port_unregister_all() unregisters all su_wait_t objects * and destroys all queued timers associated with given root object. * * @param self - pointer to port object * @param root - pointer to root object *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -