⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ioqueue_common_abs.c

📁 基于sip协议的网络电话源码
💻 C
📖 第 1 页 / 共 3 页
字号:
    }#   if PJ_HAS_TCP    if (!pj_list_empty(&h->accept_list)) {        struct accept_operation *accept_op;	        /* Get one accept operation from the list. */	accept_op = h->accept_list.next;        pj_list_erase(accept_op);        accept_op->op = 0;	/* Clear bit in fdset if there is no more pending accept */        if (pj_list_empty(&h->accept_list))            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);	rc=pj_sock_accept(h->fd, accept_op->accept_fd,                           accept_op->rmt_addr, accept_op->addrlen);	if (rc==PJ_SUCCESS && accept_op->local_addr) {	    rc = pj_sock_getsockname(*accept_op->accept_fd,                                      accept_op->local_addr,				     accept_op->addrlen);	}        /* Unlock; from this point we don't need to hold key's mutex. */        pj_mutex_unlock(h->mutex);	/* Call callback. */        if (h->cb.on_accept_complete && !IS_CLOSING(h)) {	    (*h->cb.on_accept_complete)(h,                                         (pj_ioqueue_op_key_t*)accept_op,                                        *accept_op->accept_fd, rc);	}    }    else#   endif    if (key_has_pending_read(h)) {        struct read_operation *read_op;        pj_ssize_t bytes_read;        /* Get one pending read operation from the list. */        read_op = h->read_list.next;        pj_list_erase(read_op);        /* Clear fdset if there is no pending read. */        if (pj_list_empty(&h->read_list))            ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);        bytes_read = read_op->size;	if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {	    read_op->op = 0;	    rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read, 				  read_op->flags,				  read_op->rmt_addr,                                   read_op->rmt_addrlen);	} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {	    read_op->op = 0;	    rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 			      read_op->flags);        } else {            pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);	    read_op->op = 0;            /*             * User has specified pj_ioqueue_read().             * On Win32, we should do ReadFile(). But because we got             * here because of select() anyway, user must have put a             * socket descriptor on h->fd, which in this case we can             * just call pj_sock_recv() instead of ReadFile().             * On Unix, user may put a file in h->fd, so we'll have             * to call read() here.             * This may not compile on systems which doesn't have              * read(). That's why we only specify PJ_LINUX here so             * that error is easier to catch.             */#	    if defined(PJ_WIN32) && PJ_WIN32 != 0 || \	       defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0                rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read, 				  read_op->flags);                //rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,                //              &bytes_read, NULL);#           elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)                bytes_read = read(h->fd, read_op->buf, bytes_read);                rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();#	    elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0                bytes_read = sys_read(h->fd, read_op->buf, bytes_read);                rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;#           else#               error "Implement read() for this platform!"#           endif        }		if (rc != PJ_SUCCESS) {#	    if defined(PJ_WIN32) && PJ_WIN32 != 0	    /* On Win32, for UDP, WSAECONNRESET on the receive side 	     * indicates that previous sending has triggered ICMP Port 	     * Unreachable message.	     * But we wouldn't know at this point which one of previous 	     * key that has triggered the error, since UDP socket can	     * be shared!	     * So we'll just ignore it!	     */	    if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {		//PJ_LOG(4,(THIS_FILE,                 //          "Ignored ICMP port unreach. on key=%p", h));	    }#	    endif            /* In any case we would report this to caller. */            bytes_read = -rc;	}        /* Unlock; from this point we don't need to hold key's mutex. */        pj_mutex_unlock(h->mutex);	/* Call callback. */        if (h->cb.on_read_complete && !IS_CLOSING(h)) {	    (*h->cb.on_read_complete)(h,                                       (pj_ioqueue_op_key_t*)read_op,                                      bytes_read);        }    } else {        /*         * This is normal; execution may fall here when multiple threads         * are signalled for the same event, but only one thread eventually         * able to process the event.         */        pj_mutex_unlock(h->mutex);    }}void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,                                        pj_ioqueue_key_t *h ){    pj_mutex_lock(h->mutex);    if (!h->connecting) {        /* It is possible that more than one thread was woken up, thus         * the remaining thread will see h->connecting as zero because         * it has been processed by other thread.         */        pj_mutex_unlock(h->mutex);        return;    }    if (IS_CLOSING(h)) {	pj_mutex_unlock(h->mutex);	return;    }    /* Clear operation. */    h->connecting = 0;    ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);    ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);    pj_mutex_unlock(h->mutex);    /* Call callback. */    if (h->cb.on_connect_complete && !IS_CLOSING(h)) {	pj_status_t status = -1;#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)	int value;	int vallen = sizeof(value);	int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, 				       &value, &vallen);	if (gs_rc == 0) {	    status = PJ_RETURN_OS_ERROR(value);	}#endif	(*h->cb.on_connect_complete)(h, status);    }}/* * pj_ioqueue_recv() * * Start asynchronous recv() from the socket. */PJ_DEF(pj_status_t) pj_ioqueue_recv(  pj_ioqueue_key_t *key,                                      pj_ioqueue_op_key_t *op_key,				      void *buffer,				      pj_ssize_t *length,				      unsigned flags ){    struct read_operation *read_op;    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);    PJ_CHECK_STACK();    read_op = (struct read_operation*)op_key;    read_op->op = 0;    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    /* Try to see if there's data immediately available.      */    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {	pj_status_t status;	pj_ssize_t size;	size = *length;	status = pj_sock_recv(key->fd, buffer, &size, flags);	if (status == PJ_SUCCESS) {	    /* Yes! Data is available! */	    *length = size;	    return PJ_SUCCESS;	} else {	    /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report	     * the error to caller.	     */	    if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))		return status;	}    }    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);    /*     * No data is immediately available.     * Must schedule asynchronous operation to the ioqueue.     */    read_op->op = PJ_IOQUEUE_OP_RECV;    read_op->buf = buffer;    read_op->size = *length;    read_op->flags = flags;    pj_mutex_lock(key->mutex);    pj_list_insert_before(&key->read_list, read_op);    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);    pj_mutex_unlock(key->mutex);    return PJ_EPENDING;}/* * pj_ioqueue_recvfrom() * * Start asynchronous recvfrom() from the socket. */PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,                                         pj_ioqueue_op_key_t *op_key,				         void *buffer,				         pj_ssize_t *length,                                         unsigned flags,				         pj_sockaddr_t *addr,				         int *addrlen){    struct read_operation *read_op;    PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);    PJ_CHECK_STACK();    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    read_op = (struct read_operation*)op_key;    read_op->op = 0;    /* Try to see if there's data immediately available.      */    if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {	pj_status_t status;	pj_ssize_t size;	size = *length;	status = pj_sock_recvfrom(key->fd, buffer, &size, flags,				  addr, addrlen);	if (status == PJ_SUCCESS) {	    /* Yes! Data is available! */	    *length = size;	    return PJ_SUCCESS;	} else {	    /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report	     * the error to caller.	     */	    if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))		return status;	}    }    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);    /*     * No data is immediately available.     * Must schedule asynchronous operation to the ioqueue.     */    read_op->op = PJ_IOQUEUE_OP_RECV_FROM;    read_op->buf = buffer;    read_op->size = *length;    read_op->flags = flags;    read_op->rmt_addr = addr;    read_op->rmt_addrlen = addrlen;    pj_mutex_lock(key->mutex);    pj_list_insert_before(&key->read_list, read_op);    ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);    pj_mutex_unlock(key->mutex);    return PJ_EPENDING;}/* * pj_ioqueue_send() * * Start asynchronous send() to the descriptor. */PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,                                     pj_ioqueue_op_key_t *op_key,			             const void *data,			             pj_ssize_t *length,                                     unsigned flags){    struct write_operation *write_op;    pj_status_t status;    unsigned retry;    pj_ssize_t sent;    PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);    PJ_CHECK_STACK();    /* Check if key is closing. */    if (IS_CLOSING(key))	return PJ_ECANCELLED;    /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */    flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);    /* Fast track:     *   Try to send data immediately, only if there's no pending write!     * Note:     *  We are speculating that the list is empty here without properly     *  acquiring ioqueue's mutex first. This is intentional, to maximize     *  performance via parallelism.     *     *  This should be safe, because:     *      - by convention, we require caller to make sure that the     *        key is not unregistered while other threads are invoking     *        an operation on the same key.     *      - pj_list_empty() is safe to be invoked by multiple threads,     *        even when other threads are modifying the list.     */    if (pj_list_empty(&key->write_list)) {        /*         * See if data can be sent immediately.         */        sent = *length;        status = pj_sock_send(key->fd, data, &sent, flags);        if (status == PJ_SUCCESS) {            /* Success! */            *length = sent;            return PJ_SUCCESS;        } else {            /* If error is not EWOULDBLOCK (or EAGAIN on Linux), report             * the error to caller.             */            if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {                return status;            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -