📄 su_osx_runloop.c
字号:
#endif }/** @internal Send a message to the port. */int su_osx_port_send(su_port_t *self, su_msg_r rmsg){ CFRunLoopRef rl; if (self) { int wakeup; //XXX - mela SU_OSX_PORT_LOCK(self, "su_osx_port_send"); wakeup = self->sup_base->sup_head == NULL; *self->sup_base->sup_tail = rmsg[0]; rmsg[0] = NULL; self->sup_base->sup_tail = &(*self->sup_base->sup_tail)->sum_next;#if SU_HAVE_MBOX /* if (!pthread_equal(pthread_self(), self->sup_tid)) */ if (wakeup) { assert(self->sup_mbox[MBOX_SEND] != INVALID_SOCKET); if (send(self->sup_mbox[MBOX_SEND], "X", 1, 0) == -1) {#if HAVE_SOCKETPAIR if (su_errno() != EWOULDBLOCK)#endif su_perror("su_msg_send: send()"); } }#endif //XXX - mela SU_OSX_PORT_UNLOCK(self, "su_osx_port_send"); rl = CFRunLoopGetCurrent(); CFRunLoopWakeUp(rl); return 0; } else { su_msg_destroy(rmsg); return -1; }}static int o_count;/** @internal * * Register a @c su_wait_t object. The wait object, a callback function and * a argument pointer is stored in the port object. The callback function * will be called when the wait object is signaled. * * Please note if identical wait objects are inserted, only first one is * ever signalled. * * @param self pointer to port * @param root pointer to root object * @param waits pointer to wait object * @param callback callback function pointer * @param arg argument given to callback function when it is invoked * @param priority relative priority of the wait object * (0 is normal, 1 important, 2 realtime) * * @return * The function @su_osx_port_register returns nonzero index of the wait object, * or -1 upon an error. */int su_osx_port_register(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, su_wakeup_arg_t *arg, int priority){ int i, j, n; CFRunLoopRef rl; CFRunLoopSourceRef *sources, source; CFSocketRef cf_socket, *sockets; int events = 0; struct osx_magic *osx_magic = NULL; CFSocketContext cf_socket_cntx[1] = {{0, NULL, NULL, NULL, NULL}}; CFOptionFlags flags = 0; // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self)); n = self->sup_n_waits; if (n >= SU_WAIT_MAX) return su_seterrno(ENOMEM); if (n >= self->sup_size_waits) { /* Reallocate size arrays */ int size; int *indices; int *reverses; su_wait_t *waits; su_wakeup_f *wait_cbs; su_wakeup_arg_t **wait_args; su_root_t **wait_tasks; if (self->sup_size_waits == 0) size = su_root_size_hint; else size = 2 * self->sup_size_waits; if (size < SU_WAIT_MIN) size = SU_WAIT_MIN; /* Too large */ if (-3 - size > 0) return (errno = ENOMEM), -1; indices = realloc(self->sup_indices, (size + 1) * sizeof(*indices)); if (indices) { self->sup_indices = indices; for (i = self->sup_size_waits; i <= size; i++) indices[i] = -1 - i; } reverses = realloc(self->sup_reverses, size * sizeof(*waits)); if (reverses) { for (i = self->sup_size_waits; i < size; i++) reverses[i] = -1; self->sup_reverses = reverses; } sources = realloc(self->sup_sources, size * sizeof(*sources)); if (sources) self->sup_sources = sources; sockets = realloc(self->sup_sockets, size * sizeof(*sockets)); if (sockets) self->sup_sockets = sockets; waits = realloc(self->sup_waits, size * sizeof(*waits)); if (waits) self->sup_waits = waits; wait_cbs = realloc(self->sup_wait_cbs, size * sizeof(*wait_cbs)); if (wait_cbs) self->sup_wait_cbs = wait_cbs; wait_args = realloc(self->sup_wait_args, size * sizeof(*wait_args)); if (wait_args) self->sup_wait_args = wait_args; /* Add sup_wait_roots array, if needed */ wait_tasks = realloc(self->sup_wait_roots, size * sizeof(*wait_tasks)); if (wait_tasks) self->sup_wait_roots = wait_tasks; if (!(indices && reverses && sources && sockets && waits && wait_cbs && wait_args && wait_tasks)) { return -1; } self->sup_size_waits = size; } i = -self->sup_indices[0]; assert(i <= self->sup_size_waits); if (priority > 0) { /* Insert */ for (n = self->sup_n_waits; n > 0; n--) { j = self->sup_reverses[n-1]; assert(self->sup_indices[j] == n - 1); self->sup_indices[j] = n; self->sup_reverses[n] = self->sup_reverses[n-1]; self->sup_sources[n] = self->sup_sources[n-1]; self->sup_sockets[n] = self->sup_sockets[n-1]; self->sup_waits[n] = self->sup_waits[n-1]; self->sup_wait_cbs[n] = self->sup_wait_cbs[n-1]; self->sup_wait_args[n] = self->sup_wait_args[n-1]; self->sup_wait_roots[n] = self->sup_wait_roots[n-1]; } self->sup_pri_offset++; } else { /* Append - no need to move anything */ n = self->sup_n_waits; } self->sup_n_waits++; self->sup_indices[0] = self->sup_indices[i]; /* Free index */ self->sup_indices[i] = n; self->sup_reverses[n] = i; self->sup_waits[n] = *wait; self->sup_wait_cbs[n] = callback; self->sup_wait_args[n] = arg; self->sup_wait_roots[n] = root; self->sup_registers++; /* XXX -- mela: leak, leak -- free() somewheeeere */ osx_magic = calloc(1, sizeof(*osx_magic)); osx_magic->o_port = self; osx_magic->o_current = i; osx_magic->o_count = ++o_count; cf_socket_cntx->info = osx_magic; events = map_poll_event_to_cf_event(wait->events); cf_socket = CFSocketCreateWithNative(NULL, (CFSocketNativeHandle) su_wait_socket(wait), events, su_osx_port_socket_cb, cf_socket_cntx); flags = CFSocketGetSocketFlags(cf_socket); flags &= ~kCFSocketCloseOnInvalidate; CFSocketSetSocketFlags(cf_socket, flags); CFRetain(cf_socket); source = CFSocketCreateRunLoopSource(NULL, cf_socket, 0); SU_DEBUG_9(("source(%p): count %u index %d\n", source, o_count, i)); rl = CFRunLoopGetCurrent(); CFRunLoopAddSource(rl, source, kCFRunLoopDefaultMode); CFRetain(source); self->sup_sources[n] = source; self->sup_sockets[n] = cf_socket; CFRunLoopWakeUp(rl); /* Just like epoll, we return -1 or positive integer */ return i;}/** Deregister a su_wait_t object. */staticint su_osx_port_deregister0(su_port_t *self, int i){ CFRunLoopRef rl; int n, N, *indices, *reverses; indices = self->sup_indices; reverses = self->sup_reverses; n = indices[i]; assert(n >= 0); assert(i == reverses[n]); N = --self->sup_n_waits; rl = CFRunLoopGetCurrent(); CFSocketInvalidate(self->sup_sockets[n]); CFRelease(self->sup_sockets[n]); CFRunLoopRemoveSource(rl, self->sup_sources[n], kCFRunLoopDefaultMode); CFRelease(self->sup_sources[n]); CFRunLoopWakeUp(rl); if (n < self->sup_pri_offset) { int j = --self->sup_pri_offset; if (n != j) { assert(reverses[j] > 0); assert(indices[reverses[j]] == j); indices[reverses[j]] = n; reverses[n] = reverses[j]; self->sup_sources[n] = self->sup_sources[j]; self->sup_sockets[n] = self->sup_sockets[j]; self->sup_waits[n] = self->sup_waits[j]; self->sup_wait_cbs[n] = self->sup_wait_cbs[j]; self->sup_wait_args[n] = self->sup_wait_args[j]; self->sup_wait_roots[n] = self->sup_wait_roots[j]; n = j; } } if (n < N) { assert(reverses[N] > 0); assert(indices[reverses[N]] == N); indices[reverses[N]] = n; reverses[n] = reverses[N]; self->sup_sources[n] = self->sup_sources[N]; self->sup_sockets[n] = self->sup_sockets[N]; self->sup_waits[n] = self->sup_waits[N]; self->sup_wait_cbs[n] = self->sup_wait_cbs[N]; self->sup_wait_args[n] = self->sup_wait_args[N]; self->sup_wait_roots[n] = self->sup_wait_roots[N]; n = N; } reverses[n] = -1; memset(&self->sup_waits[n], 0, sizeof self->sup_waits[n]); self->sup_sources[n] = NULL; self->sup_sockets[n] = NULL; self->sup_wait_cbs[n] = NULL; self->sup_wait_args[n] = NULL; self->sup_wait_roots[n] = NULL; indices[i] = indices[0]; indices[0] = -i; self->sup_registers++; return i;}/** Unregister a su_wait_t object. * * The function su_osx_port_unregister() unregisters a su_wait_t object. The * wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param root - pointer to root object * @param wait - pointer to wait object * @param callback - callback function pointer (may be NULL) * @param arg - argument given to callback function when it is invoked * (may be NULL) * * @return Nonzero index of the wait object, or -1 upon an error. */int su_osx_port_unregister(su_port_t *self, su_root_t *root, su_wait_t *wait, su_wakeup_f callback, /* XXX - ignored */ su_wakeup_arg_t *arg){ int n, N; assert(self); // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self)); N = self->sup_n_waits; for (n = 0; n < N; n++) { if (SU_WAIT_CMP(wait[0], self->sup_waits[n]) == 0) { return su_osx_port_deregister0(self, self->sup_reverses[n]); } } su_seterrno(ENOENT); return -1;}/** Deregister a su_wait_t object. * * The function su_osx_port_deregister() deregisters a su_wait_t registrattion. * The wait object, a callback function and a argument are removed from the * port object. * * @param self - pointer to port object * @param i - registration index * * @return Index of the wait object, or -1 upon an error. */int su_osx_port_deregister(su_port_t *self, int i){ su_wait_t wait[1] = { SU_WAIT_INIT }; int retval; assert(self); // XXX - mela assert(SU_OSX_PORT_OWN_THREAD(self)); if (i <= 0 || i > self->sup_size_waits) return su_seterrno(EBADF); if (self->sup_indices[i] < 0) return su_seterrno(EBADF); retval = su_osx_port_deregister0(self, i); su_wait_destroy(wait); return retval;}/** @internal * Unregister all su_wait_t objects. * * The function su_osx_port_unregister_all() unregisters all su_wait_t objects * and destroys all queued timers associated with given root object. * * @param self - pointer to port object * @param root - pointer to root object * * @return Number of wait objects removed. */int su_osx_port_unregister_all(su_port_t *self, su_root_t *root){ int i, j, index, N; int *indices, *reverses; su_wait_t *waits; su_wakeup_f *wait_cbs; su_wakeup_arg_t **wait_args; su_root_t **wait_roots; CFRunLoopRef rl; CFRunLoopSourceRef *sources; CFSocketRef *sockets; // XXX - assert(SU_OSX_PORT_OWN_THREAD(self)); N = self->sup_n_waits; indices = self->sup_indices; reverses = self->sup_reverses; sources = self->sup_sources; sockets = self->sup_sockets; waits = self->sup_waits; wait_cbs = self->sup_wait_cbs; wait_args = self->sup_wait_args; wait_roots = self->sup_wait_roots; rl = CFRunLoopGetCurrent(); for (i = j = 0; i < N; i++) { index = reverses[i]; assert(index > 0 && indices[index] == i); if (wait_roots[i] == root) { if (i < self->sup_pri_offset) self->sup_pri_offset--;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -