⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_common_abs.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 3 页
字号:
        }    }    /*     * Schedule asynchronous send.     */    write_op = (struct write_operation*)op_key;    /* Spin if write_op has pending operation */    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)	pj_thread_sleep(0);    /* Last chance */    if (write_op->op) {	/* Unable to send packet because there is already pending write in the	 * write_op. We could not put the operation into the write_op	 * because write_op already contains a pending operation! And	 * we could not send the packet directly with send() either,	 * because that will break the order of the packet. So we can	 * only return error here.	 *	 * This could happen for example in multithreads program,	 * where polling is done by one thread, while other threads are doing	 * the sending only. If the polling thread runs on lower priority	 * than the sending thread, then it's possible that the pending	 * write flag is not cleared in-time because clearing is only done	 * during polling. 	 *	 * Aplication should specify multiple write operation keys on	 * situation like this.	 */	//pj_assert(!"ioqueue: there is pending operation on this key!");	return PJ_EBUSY;    }    write_op->op = PJ_IOQUEUE_OP_SEND;    write_op->buf = (void*)data;    write_op->size = *length;    write_op->written = 0;    write_op->flags = flags;        pj_mutex_lock(key->mutex);    pj_list_insert_before(&key->write_list, write_op);    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);    pj_mutex_unlock(key->mutex);    return PJ_EPENDING;}/* * pj_ioqueue_sendto() * * Start asynchronous write() to the descriptor. */PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,                                       pj_ioqueue_op_key_t *op_key,			               const void *data,			               pj_ssize_t *length,                                       pj_uint32_t flags,			               const pj_sockaddr_t *addr,			               int addrlen){    struct write_operation *write_op;    unsigned retry;    pj_status_t status;    pj_ssize_t sent;    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);    PJ_CHECK_STACK();    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);    /* Fast track:     *   Try to send data immediately, only if there's no pending write!     * Note:     *  We are speculating that the list is empty here without properly     *  acquiring ioqueue's mutex first. This is intentional, to maximize     *  performance via parallelism.     *     *  This should be safe, because:     *      - by convention, we require caller to make sure that the     *        key is not unregistered while other threads are invoking     *        an operation on the same key.     *      - pj_list_empty() is safe to be invoked by multiple threads,     *        even when other threads are modifying the list.     */    if (pj_list_empty(&key->write_list)) {        /*         * See if data can be sent immediately.         */        sent = *length;        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);        if (status == PJ_SUCCESS) {            /* Success! */            *length = sent;            return PJ_SUCCESS;        } else {            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report             * the error to caller.             */            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {                return status;            }	    status = status;        }    }    /*     * Check that address storage can hold the address parameter.     */    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);    /*     * Schedule asynchronous send.     */    write_op = (struct write_operation*)op_key;        /* Spin if write_op has pending operation */    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)	pj_thread_sleep(0);    /* Last chance */    if (write_op->op) {	/* Unable to send packet because there is already pending write on the	 * write_op. We could not put the operation into the write_op	 * because write_op already contains a pending operation! And	 * we could not send the packet directly with sendto() either,	 * because that will break the order of the packet. So we can	 * only return error here.	 *	 * This could happen for example in multithreads program,	 * where polling is done by one thread, while other threads are doing	 * the sending only. If the polling thread runs on lower priority	 * than the sending thread, then it's possible that the pending	 * write flag is not cleared in-time because clearing is only done	 * during polling. 	 *	 * Aplication should specify multiple write operation keys on	 * situation like this.	 */	//pj_assert(!"ioqueue: there is pending operation on this key!");	return PJ_EBUSY;    }    write_op->op = PJ_IOQUEUE_OP_SEND_TO;    write_op->buf = (void*)data;    write_op->size = *length;    write_op->written = 0;    write_op->flags = flags;    pj_memcpy(&write_op->rmt_addr, addr, addrlen);    write_op->rmt_addrlen = addrlen;        pj_mutex_lock(key->mutex);    pj_list_insert_before(&key->write_list, write_op);    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);    pj_mutex_unlock(key->mutex);    return PJ_EPENDING;}#if PJ_HAS_TCP/* * Initiate overlapped accept() operation. */PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,                                       pj_ioqueue_op_key_t *op_key,			               pj_sock_t *new_sock,			               pj_sockaddr_t *local,			               pj_sockaddr_t *remote,			               int *addrlen){    struct accept_operation *accept_op;    pj_status_t status;    /* check parameters. All must be specified! */    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    accept_op = (struct accept_operation*)op_key;    accept_op->op = 0;    /* Fast track:     *  See if there's new connection available immediately.     */    if (pj_list_empty(&key->accept_list)) {        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);        if (status == PJ_SUCCESS) {            /* Yes! New connection is available! */            if (local && addrlen) {                status = pj_sock_getsockname(*new_sock, local, addrlen);                if (status != PJ_SUCCESS) {                    pj_sock_close(*new_sock);                    *new_sock = PJ_INVALID_SOCKET;                    return status;                }            }            return PJ_SUCCESS;        } else {            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report             * the error to caller.             */            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {                return status;            }        }    }    /*     * No connection is available immediately.     * Schedule accept() operation to be completed when there is incoming     * connection available.     */    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;    accept_op->accept_fd = new_sock;    accept_op->rmt_addr = remote;    accept_op->addrlen= addrlen;    accept_op->local_addr = local;    pj_mutex_lock(key->mutex);    pj_list_insert_before(&key->accept_list, accept_op);    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);    pj_mutex_unlock(key->mutex);    return PJ_EPENDING;}/* * Initiate overlapped connect() operation (well, it's non-blocking actually, * since there's no overlapped version of connect()). */PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,					const pj_sockaddr_t *addr,					int addrlen ){    pj_status_t status;        /* check parameters. All must be specified! */    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    /* Check if socket has not been marked for connecting */    if (key->connecting != 0)        return PJ_EPENDING;        status = pj_sock_connect(key->fd, addr, addrlen);    if (status == PJ_SUCCESS) {	/* Connected! */	return PJ_SUCCESS;    } else {	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {	    /* Pending! */            pj_mutex_lock(key->mutex);	    key->connecting = PJ_TRUE;            ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);            ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);            pj_mutex_unlock(key->mutex);	    return PJ_EPENDING;	} else {	    /* Error! */	    return status;	}    }}#endif	/* PJ_HAS_TCP */PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,				     pj_size_t size ){    pj_bzero(op_key, size);}/* * pj_ioqueue_is_pending() */PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,                                         pj_ioqueue_op_key_t *op_key ){    struct generic_operation *op_rec;    PJ_UNUSED_ARG(key);    op_rec = (struct generic_operation*)op_key;    return op_rec->op != 0;}/* * pj_ioqueue_post_completion() */PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,                                                pj_ioqueue_op_key_t *op_key,                                                pj_ssize_t bytes_status ){    struct generic_operation *op_rec;    /*     * Find the operation key in all pending operation list to     * really make sure that it's still there; then call the callback.     */    pj_mutex_lock(key->mutex);    /* Find the operation in the pending read list. */    op_rec = (struct generic_operation*)key->read_list.next;    while (op_rec != (void*)&key->read_list) {        if (op_rec == (void*)op_key) {            pj_list_erase(op_rec);            op_rec->op = 0;            pj_mutex_unlock(key->mutex);            (*key->cb.on_read_complete)(key, op_key, bytes_status);            return PJ_SUCCESS;        }        op_rec = op_rec->next;    }    /* Find the operation in the pending write list. */    op_rec = (struct generic_operation*)key->write_list.next;    while (op_rec != (void*)&key->write_list) {        if (op_rec == (void*)op_key) {            pj_list_erase(op_rec);            op_rec->op = 0;            pj_mutex_unlock(key->mutex);            (*key->cb.on_write_complete)(key, op_key, bytes_status);            return PJ_SUCCESS;        }        op_rec = op_rec->next;    }    /* Find the operation in the pending accept list. */    op_rec = (struct generic_operation*)key->accept_list.next;    while (op_rec != (void*)&key->accept_list) {        if (op_rec == (void*)op_key) {            pj_list_erase(op_rec);            op_rec->op = 0;            pj_mutex_unlock(key->mutex);            (*key->cb.on_accept_complete)(key, op_key,                                           PJ_INVALID_SOCKET,                                          bytes_status);            return PJ_SUCCESS;        }        op_rec = op_rec->next;    }    pj_mutex_unlock(key->mutex);        return PJ_EINVALIDOP;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -