⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_common_abs.c

📁 一个开源SIP协议栈
💻 C
📖 第 1 页 / 共 3 页
字号:
        }
    }

    /*
     * Schedule asynchronous send.
     */
    write_op = (struct write_operation*)op_key;

    /* Spin if write_op has pending operation */
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
	pj_thread_sleep(0);

    /* Last chance */
    if (write_op->op) {
	/* Unable to send packet because there is already pending write in the
	 * write_op. We could not put the operation into the write_op
	 * because write_op already contains a pending operation! And
	 * we could not send the packet directly with send() either,
	 * because that will break the order of the packet. So we can
	 * only return error here.
	 *
	 * This could happen for example in multithreads program,
	 * where polling is done by one thread, while other threads are doing
	 * the sending only. If the polling thread runs on lower priority
	 * than the sending thread, then it's possible that the pending
	 * write flag is not cleared in-time because clearing is only done
	 * during polling. 
	 *
	 * Aplication should specify multiple write operation keys on
	 * situation like this.
	 */
	//pj_assert(!"ioqueue: there is pending operation on this key!");
	return PJ_EBUSY;
    }

    write_op->op = PJ_IOQUEUE_OP_SEND;
    write_op->buf = (void*)data;
    write_op->size = *length;
    write_op->written = 0;
    write_op->flags = flags;
    
    pj_mutex_lock(key->mutex);
    pj_list_insert_before(&key->write_list, write_op);
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
    pj_mutex_unlock(key->mutex);

    return PJ_EPENDING;
}


/*
 * pj_ioqueue_sendto()
 *
 * Start asynchronous write() to the descriptor.
 */
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
			               const void *data,
			               pj_ssize_t *length,
                                       pj_uint32_t flags,
			               const pj_sockaddr_t *addr,
			               int addrlen)
{
    struct write_operation *write_op;
    unsigned retry;
    pj_status_t status;
    pj_ssize_t sent;

    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
    PJ_CHECK_STACK();

    /* Check if key is closing. */
    if (IS_CLOSING(key))
	return PJ_ECANCELLED;

    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);

    /* Fast track:
     *   Try to send data immediately, only if there's no pending write!
     * Note:
     *  We are speculating that the list is empty here without properly
     *  acquiring ioqueue's mutex first. This is intentional, to maximize
     *  performance via parallelism.
     *
     *  This should be safe, because:
     *      - by convention, we require caller to make sure that the
     *        key is not unregistered while other threads are invoking
     *        an operation on the same key.
     *      - pj_list_empty() is safe to be invoked by multiple threads,
     *        even when other threads are modifying the list.
     */
    if (pj_list_empty(&key->write_list)) {
        /*
         * See if data can be sent immediately.
         */
        sent = *length;
        status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
        if (status == PJ_SUCCESS) {
            /* Success! */
            *length = sent;
            return PJ_SUCCESS;
        } else {
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
             * the error to caller.
             */
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
                return status;
            }
	    status = status;
        }
    }

    /*
     * Check that address storage can hold the address parameter.
     */
    PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);

    /*
     * Schedule asynchronous send.
     */
    write_op = (struct write_operation*)op_key;
    
    /* Spin if write_op has pending operation */
    for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
	pj_thread_sleep(0);

    /* Last chance */
    if (write_op->op) {
	/* Unable to send packet because there is already pending write on the
	 * write_op. We could not put the operation into the write_op
	 * because write_op already contains a pending operation! And
	 * we could not send the packet directly with sendto() either,
	 * because that will break the order of the packet. So we can
	 * only return error here.
	 *
	 * This could happen for example in multithreads program,
	 * where polling is done by one thread, while other threads are doing
	 * the sending only. If the polling thread runs on lower priority
	 * than the sending thread, then it's possible that the pending
	 * write flag is not cleared in-time because clearing is only done
	 * during polling. 
	 *
	 * Aplication should specify multiple write operation keys on
	 * situation like this.
	 */
	//pj_assert(!"ioqueue: there is pending operation on this key!");
	return PJ_EBUSY;
    }

    write_op->op = PJ_IOQUEUE_OP_SEND_TO;
    write_op->buf = (void*)data;
    write_op->size = *length;
    write_op->written = 0;
    write_op->flags = flags;
    pj_memcpy(&write_op->rmt_addr, addr, addrlen);
    write_op->rmt_addrlen = addrlen;
    
    pj_mutex_lock(key->mutex);
    pj_list_insert_before(&key->write_list, write_op);
    ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
    pj_mutex_unlock(key->mutex);

    return PJ_EPENDING;
}

#if PJ_HAS_TCP
/*
 * Initiate overlapped accept() operation.
 */
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
                                       pj_ioqueue_op_key_t *op_key,
			               pj_sock_t *new_sock,
			               pj_sockaddr_t *local,
			               pj_sockaddr_t *remote,
			               int *addrlen)
{
    struct accept_operation *accept_op;
    pj_status_t status;

    /* check parameters. All must be specified! */
    PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);

    /* Check if key is closing. */
    if (IS_CLOSING(key))
	return PJ_ECANCELLED;

    accept_op = (struct accept_operation*)op_key;
    accept_op->op = 0;

    /* Fast track:
     *  See if there's new connection available immediately.
     */
    if (pj_list_empty(&key->accept_list)) {
        status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
        if (status == PJ_SUCCESS) {
            /* Yes! New connection is available! */
            if (local && addrlen) {
                status = pj_sock_getsockname(*new_sock, local, addrlen);
                if (status != PJ_SUCCESS) {
                    pj_sock_close(*new_sock);
                    *new_sock = PJ_INVALID_SOCKET;
                    return status;
                }
            }
            return PJ_SUCCESS;
        } else {
            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
             * the error to caller.
             */
            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
                return status;
            }
        }
    }

    /*
     * No connection is available immediately.
     * Schedule accept() operation to be completed when there is incoming
     * connection available.
     */
    accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
    accept_op->accept_fd = new_sock;
    accept_op->rmt_addr = remote;
    accept_op->addrlen= addrlen;
    accept_op->local_addr = local;

    pj_mutex_lock(key->mutex);
    pj_list_insert_before(&key->accept_list, accept_op);
    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
    pj_mutex_unlock(key->mutex);

    return PJ_EPENDING;
}

/*
 * Initiate overlapped connect() operation (well, it's non-blocking actually,
 * since there's no overlapped version of connect()).
 */
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
					const pj_sockaddr_t *addr,
					int addrlen )
{
    pj_status_t status;
    
    /* check parameters. All must be specified! */
    PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);

    /* Check if key is closing. */
    if (IS_CLOSING(key))
	return PJ_ECANCELLED;

    /* Check if socket has not been marked for connecting */
    if (key->connecting != 0)
        return PJ_EPENDING;
    
    status = pj_sock_connect(key->fd, addr, addrlen);
    if (status == PJ_SUCCESS) {
	/* Connected! */
	return PJ_SUCCESS;
    } else {
	if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
	    /* Pending! */
            pj_mutex_lock(key->mutex);
	    key->connecting = PJ_TRUE;
            ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
            ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
            pj_mutex_unlock(key->mutex);
	    return PJ_EPENDING;
	} else {
	    /* Error! */
	    return status;
	}
    }
}
#endif	/* PJ_HAS_TCP */


PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
				     pj_size_t size )
{
    pj_bzero(op_key, size);
}


/*
 * pj_ioqueue_is_pending()
 */
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
                                         pj_ioqueue_op_key_t *op_key )
{
    struct generic_operation *op_rec;

    PJ_UNUSED_ARG(key);

    op_rec = (struct generic_operation*)op_key;
    return op_rec->op != 0;
}


/*
 * pj_ioqueue_post_completion()
 */
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
                                                pj_ioqueue_op_key_t *op_key,
                                                pj_ssize_t bytes_status )
{
    struct generic_operation *op_rec;

    /*
     * Find the operation key in all pending operation list to
     * really make sure that it's still there; then call the callback.
     */
    pj_mutex_lock(key->mutex);

    /* Find the operation in the pending read list. */
    op_rec = (struct generic_operation*)key->read_list.next;
    while (op_rec != (void*)&key->read_list) {
        if (op_rec == (void*)op_key) {
            pj_list_erase(op_rec);
            op_rec->op = 0;
            pj_mutex_unlock(key->mutex);

            (*key->cb.on_read_complete)(key, op_key, bytes_status);
            return PJ_SUCCESS;
        }
        op_rec = op_rec->next;
    }

    /* Find the operation in the pending write list. */
    op_rec = (struct generic_operation*)key->write_list.next;
    while (op_rec != (void*)&key->write_list) {
        if (op_rec == (void*)op_key) {
            pj_list_erase(op_rec);
            op_rec->op = 0;
            pj_mutex_unlock(key->mutex);

            (*key->cb.on_write_complete)(key, op_key, bytes_status);
            return PJ_SUCCESS;
        }
        op_rec = op_rec->next;
    }

    /* Find the operation in the pending accept list. */
    op_rec = (struct generic_operation*)key->accept_list.next;
    while (op_rec != (void*)&key->accept_list) {
        if (op_rec == (void*)op_key) {
            pj_list_erase(op_rec);
            op_rec->op = 0;
            pj_mutex_unlock(key->mutex);

            (*key->cb.on_accept_complete)(key, op_key, 
                                          PJ_INVALID_SOCKET,
                                          bytes_status);
            return PJ_SUCCESS;
        }
        op_rec = op_rec->next;
    }

    pj_mutex_unlock(key->mutex);
    
    return PJ_EINVALIDOP;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -