📄 su_root.c
字号:
* other words, the sub-task can be schedule * -# I/O events with su_root_register() * -# timers with su_timer_set(), su_timer_set_at() or su_timer_run() * -# messages with su_msg_send(). * * Messages can also be used to pass information between tasks or threads. * * In multi-threaded implementation, su_clone_start() launches a new thread, * and the initialization routine is executed by this newly created thread. * The calling thread blocks until the initialization routine completes. If * the initialization routine returns #su_success (0), the sub-task is * considered to be created successfully. After the successful * initialization, the sub-task continues to execeute the function * su_root_run(). * * In single-threaded implementations, just a new root object is created. * The initialization routine is called directly from su_clone_start(). * * If the initalization function @a init fails, the sub-task (either the * newly created thread or the current thread executing the su_clone_start() * function) calls the deinitialization function, and su_clone_start() * returns NULL. * * @param parent root to be cloned (may be NULL if multi-threaded) * @param return_clone reference to a clone [OUT] * @param magic pointer to user data * @param init initialization function * @param deinit deinitialization function * * @return 0 if successfull, -1 upon an error. * * @sa su_root_threading(), su_clone_task(), su_clone_stop(), su_clone_wait(), * su_clone_forget(). */int su_clone_start(su_root_t *parent, su_clone_r return_clone, su_root_magic_t *magic, su_root_init_f init, su_root_deinit_f deinit){ su_root_t *child; int retval = -1; if (parent) { assert(SU_ROOT_OWN_THREAD(parent)); assert(parent->sur_port); }#if !SU_HAVE_PTHREADS else { /* if we don't have threads, we *must* have parent root */ return -1; }#endif child = su_salloc(NULL, sizeof(struct su_root_s));#if SU_HAVE_PTHREADS if (child && (parent == NULL || parent->sur_threading)) { struct clone_args arg = { NULL, NULL, NULL, PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, -1, SU_MSG_R_INIT, NULL }; int thread_created = 0; pthread_t tid; su_port_threadsafe(parent->sur_port); arg.self = child; arg.init = init; arg.deinit = deinit; arg.parent = parent; child->sur_magic = magic; child->sur_deinit = deinit; child->sur_threading = parent->sur_threading; SU_TASK_COPY(child->sur_parent, su_root_task(parent), su_clone_start); pthread_mutex_lock(&arg.mutex); if (pthread_create(&tid, NULL, su_clone_main, &arg) == 0) { pthread_cond_wait(&arg.cv, &arg.mutex); thread_created = 1; } pthread_mutex_unlock(&arg.mutex); if (arg.retval != 0) { if (thread_created) pthread_join(tid, NULL); su_root_destroy(child), child = NULL; } else { retval = 0; *return_clone = *arg.clone; } } else#endif if (child) { assert(parent); child->sur_magic = magic; child->sur_deinit = deinit; child->sur_threading = parent->sur_threading; SU_TASK_COPY(child->sur_parent, su_root_task(parent), su_clone_start); SU_TASK_COPY(child->sur_task, child->sur_parent, su_clone_start); su_task_attach(child->sur_task, child); if (su_msg_create(return_clone, child->sur_task, su_root_task(parent), su_clone_xyzzy, sizeof(child)) == 0) { if (init == NULL || init(child, magic) == 0) { su_cloned_t *sc = su_msg_data(return_clone); sc->sc_root = child;#if SU_HAVE_PTHREADS sc->sc_tid = pthread_self(); pthread_mutex_init(sc->sc_pause, NULL); pthread_cond_init(sc->sc_resume, NULL); pthread_mutex_lock(sc->sc_pause);#endif retval = 0; } else { if (deinit) deinit(child, magic); su_msg_destroy(return_clone); su_root_destroy(child), child = NULL; } } else { su_root_destroy(child), child = NULL; } } return retval;}/** Get reference to clone task. * * @param clone Clone pointer * * @return A reference to the task structure of the clone. */_su_task_r su_clone_task(su_clone_r clone){ return su_msg_to(clone);}/**Forget the clone. * * Normally, the clone task executes until it is stopped. If the parent * task does not need to stop the task, it can "forget" the clone. The * clone exits independently of the parent task. * * @param rclone Reference to the clone. */void su_clone_forget(su_clone_r rclone){ su_msg_destroy(rclone);}/** Stop the clone. * * @deprecated. Use su_clone_wait(). */void su_clone_stop(su_clone_r rclone){ su_msg_send(rclone);}/** Stop a clone and wait until it is has completed. * * The function su_clone_wait() is used to stop the clone task and wait * until it has cleaned up. The clone task is destroyed asynchronously. The * parent sends a message to clone, clone deinitializes itself and then * replies. After the reply message is received by the parent, it will send * a third message back to clone. * * The parent destroy all messages to or from clone task before calling * su_clone_wait(). The parent task may not send any messages to the clone * after calling su_clone_wait(). The su_clone_wait() function blocks until * the cloned task is destroyed. During that time, the parent task must be * prepared to process all the messages sent by clone task. This includes * all the messages sent by clone before destroy message reached the clone. */void su_clone_wait(su_root_t *root, su_clone_r rclone){ su_cloned_t *sc = su_msg_data(rclone); if (sc) {#if SU_HAVE_PTHREADS pthread_t clone_tid = sc->sc_tid;#endif int one = 1; /* This does 3-way handshake. * First, su_clone_break() is executed by clone. * The message is returned to parent (this task), * which executes su_clone_report(). * Then the message is again returned to clone, * which executes su_clone_report2() and exits. */ sc->sc_wait = &one; su_msg_send(rclone); su_root_step(root, 0); su_root_step(root, 0); while (one) su_root_step(root, 10);#if SU_HAVE_PTHREADS if (!pthread_equal(clone_tid, pthread_self())) pthread_join(clone_tid, NULL);#endif }}#if SU_HAVE_PTHREADS /* No-op without threads */staticvoid su_clone_paused(su_root_magic_t *magic, su_msg_r msg, su_msg_arg_t *arg){ su_cloned_t *cloned = *(su_cloned_t **)arg; assert(cloned); pthread_cond_wait(cloned->sc_resume, cloned->sc_pause);}#endif/** Pause a clone. * * Obtain a exclusive lock on clone's private data. * * @retval 0 if successful (and clone is paused) * @retval -1 upon an error */int su_clone_pause(su_clone_r rclone){#if SU_HAVE_PTHREADS /* No-op without threads */ su_cloned_t *cloned = su_msg_data(rclone); su_msg_r m = SU_MSG_R_INIT; if (!cloned) return (errno = EFAULT), -1; if (pthread_equal(pthread_self(), cloned->sc_tid)) return 0; if (su_msg_create(m, su_clone_task(rclone), su_task_null, su_clone_paused, sizeof cloned) < 0) return -1; *(su_cloned_t **)su_msg_data(m) = cloned; if (su_msg_send(m) < 0) return -1; if (pthread_mutex_lock(cloned->sc_pause) < 0) return -1; pthread_cond_signal(cloned->sc_resume);#endif return 0;}/** Resume a clone. * * Give up a exclusive lock on clone's private data. * * @retval 0 if successful (and clone is resumed) * @retval -1 upon an error */int su_clone_resume(su_clone_r rclone){#if SU_HAVE_PTHREADS /* No-op without threads */ su_cloned_t *cloned = su_msg_data(rclone); if (!cloned) return (errno = EFAULT), -1; if (pthread_equal(pthread_self(), cloned->sc_tid)) return 0; if (pthread_mutex_unlock(cloned->sc_pause) < 0) return -1;#endif return 0;}/* ========================================================================= * Messages *//** * Allocates a message of given size. * * The function @c su_msg_create() allocates a message with given data size. * If successful, it moves the new message handle to the @c rmsg. * * @param rmsg handle to the new message (may be uninitialized prior calling) * @param to the recipient task * @param from the sender task * @param wakeup function that is called when message is delivered * @param size size of the message data * * @retval 0 if successful, * @retval -1 if message allocation fails. */int su_msg_create(su_msg_r rmsg, su_task_r const to, su_task_r const from, su_msg_f wakeup, int size){ su_port_t *port = to->sut_port; su_msg_t *msg; SU_PORT_LOCK(port, su_msg_create); msg = su_salloc(NULL /*port->sup_home*/, sizeof(*msg) + size); SU_PORT_UNLOCK(port, su_msg_create); if (msg) { SU_TASK_COPY(msg->sum_to, to, su_msg_create); SU_TASK_COPY(msg->sum_from, from, su_msg_create); msg->sum_func = wakeup; *rmsg = msg; return 0; } *rmsg = NULL; return -1;}/** Add a report function to a message * */int su_msg_report(su_msg_r msg, su_msg_f report){ if (msg && msg[0] && msg[0]->sum_report == NULL) { msg[0]->sum_report = report; return 0; } return -1;}/** * Allocates a reply message of given size. * * @param reply handle to the new message (may be uninitialized prior calling) * @param msg the incoming message * @param wakeup function that is called when message is delivered * @param size size of the message data * * @retval 0 if successful, * @retval -1 otherwise. */int su_msg_reply(su_msg_r reply, su_msg_r const msg, su_msg_f wakeup, int size){ su_msg_r msg0; assert(msg != reply); *msg0 = *msg; *reply = NULL; return su_msg_create(reply, su_msg_from(msg0), su_msg_to(msg0), wakeup, size);}/** Send a delivery report */void su_msg_delivery_report(su_msg_r msg){ su_task_r swap; *swap = *msg[0]->sum_from; *msg[0]->sum_from = *msg[0]->sum_to; *msg[0]->sum_to = *swap; msg[0]->sum_func = msg[0]->sum_report; msg[0]->sum_report = NULL; su_msg_send(msg);}/** Save a message */void su_msg_save(su_msg_r save, su_msg_r msg){ if (save) { if (msg) save[0] = msg[0]; else save[0] = NULL; } if (msg) msg[0] = NULL;}/** * Destroys an unsent message. * * @param rmsg message handle. */void su_msg_destroy(su_msg_r rmsg){ assert(rmsg); if (rmsg[0]) { /* su_port_t *port = rmsg[0]->sum_to->sut_port; */ /* SU_PORT_INCREF(port, su_msg_destroy); */ SU_TASK_ZAP(rmsg[0]->sum_to, su_msg_destroy); SU_TASK_ZAP(rmsg[0]->sum_from, su_msg_destroy); su_free(NULL /* port->sup_home */, rmsg[0]); /* SU_PORT_UNLOCK(port, su_msg_destroy); */ /* SU_PORT_DECREF(port, su_msg_destroy); */ } rmsg[0] = NULL;}/** Gets a pointer to the message data area. * * The function @c su_msg_data() returns a pointer to the message data * area. If @c rmsg contains a @c NULL handle, or message size is 0, @c NULL * pointer is returned. * * @param rmsg message handle * * @return A pointer to the message data area is returned. */su_msg_arg_t *su_msg_data(su_msg_cr rmsg){ if (rmsg[0] && rmsg[0]->sum_size > sizeof(su_msg_t)) return rmsg[0]->sum_data; else return NULL;}/** Get size of message data area. */int su_msg_size(su_msg_cr rmsg){ return rmsg[0] ? rmsg[0]->sum_size - sizeof(su_msg_t) : 0;}/** Get sending task. * * Returns the task handle belonging to the sender of the message. * * If the message handle contains NULL the function @c su_msg_from * returns NULL. * * @param msg message handle * * @return The task handle of the sender is returned. */_su_task_r su_msg_from(su_msg_r const msg){ return msg[0] ? msg[0]->sum_from : NULL;}/** Get destination task. * * The function @c su_msg_from returns the task handle belonging to the * recipient of the message. * * If the message handle contains NULL the function @c su_msg_to * returns NULL. * * @param msg message handle * * @return The task handle of the recipient is returned. */_su_task_r su_msg_to(su_msg_r const msg){ return msg[0] ? msg[0]->sum_to : NULL;}/** Remove references to 'from' and 'to' tasks from a message. * * @param msg message handle */void su_msg_remove_refs(su_msg_r const msg){ if (msg[0]) { su_task_deinit(msg[0]->sum_to); su_task_deinit(msg[0]->sum_from); }}/**Send a message. * * The function @c su_msg_send() sends the message. The message is added to * the recipients message queue, and recipient is waken up. The caller may * not alter the message or the data associated with it after the message * has been sent. * * @param rmsg message handle * * @retval 0 if signal was sent successfully or handle was @c NULL, * @retval -1 otherwise. */int su_msg_send(su_msg_r rmsg){ assert(rmsg); if (rmsg[0]) { su_msg_t *msg = rmsg[0]; assert(msg->sum_to->sut_port); return su_port_send(msg->sum_to->sut_port, rmsg); } return 0; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -