📄 ioqueue_common_abs.c
字号:
}
# if PJ_HAS_TCP
if (!pj_list_empty(&h->accept_list)) {
struct accept_operation *accept_op;
/* Get one accept operation from the list. */
accept_op = h->accept_list.next;
pj_list_erase(accept_op);
accept_op->op = 0;
/* Clear bit in fdset if there is no more pending accept */
if (pj_list_empty(&h->accept_list))
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
rc=pj_sock_accept(h->fd, accept_op->accept_fd,
accept_op->rmt_addr, accept_op->addrlen);
if (rc==PJ_SUCCESS && accept_op->local_addr) {
rc = pj_sock_getsockname(*accept_op->accept_fd,
accept_op->local_addr,
accept_op->addrlen);
}
/* Unlock; from this point we don't need to hold key's mutex. */
pj_mutex_unlock(h->mutex);
/* Call callback. */
if (h->cb.on_accept_complete && !IS_CLOSING(h)) {
(*h->cb.on_accept_complete)(h,
(pj_ioqueue_op_key_t*)accept_op,
*accept_op->accept_fd, rc);
}
}
else
# endif
if (key_has_pending_read(h)) {
struct read_operation *read_op;
pj_ssize_t bytes_read;
/* Get one pending read operation from the list. */
read_op = h->read_list.next;
pj_list_erase(read_op);
/* Clear fdset if there is no pending read. */
if (pj_list_empty(&h->read_list))
ioqueue_remove_from_set(ioqueue, h, READABLE_EVENT);
bytes_read = read_op->size;
if ((read_op->op == PJ_IOQUEUE_OP_RECV_FROM)) {
read_op->op = 0;
rc = pj_sock_recvfrom(h->fd, read_op->buf, &bytes_read,
read_op->flags,
read_op->rmt_addr,
read_op->rmt_addrlen);
} else if ((read_op->op == PJ_IOQUEUE_OP_RECV)) {
read_op->op = 0;
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
read_op->flags);
} else {
pj_assert(read_op->op == PJ_IOQUEUE_OP_READ);
read_op->op = 0;
/*
* User has specified pj_ioqueue_read().
* On Win32, we should do ReadFile(). But because we got
* here because of select() anyway, user must have put a
* socket descriptor on h->fd, which in this case we can
* just call pj_sock_recv() instead of ReadFile().
* On Unix, user may put a file in h->fd, so we'll have
* to call read() here.
* This may not compile on systems which doesn't have
* read(). That's why we only specify PJ_LINUX here so
* that error is easier to catch.
*/
# if defined(PJ_WIN32) && PJ_WIN32 != 0 || \
defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE != 0
rc = pj_sock_recv(h->fd, read_op->buf, &bytes_read,
read_op->flags);
//rc = ReadFile((HANDLE)h->fd, read_op->buf, read_op->size,
// &bytes_read, NULL);
# elif (defined(PJ_HAS_UNISTD_H) && PJ_HAS_UNISTD_H != 0)
bytes_read = read(h->fd, read_op->buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : pj_get_os_error();
# elif defined(PJ_LINUX_KERNEL) && PJ_LINUX_KERNEL != 0
bytes_read = sys_read(h->fd, read_op->buf, bytes_read);
rc = (bytes_read >= 0) ? PJ_SUCCESS : -bytes_read;
# else
# error "Implement read() for this platform!"
# endif
}
if (rc != PJ_SUCCESS) {
# if defined(PJ_WIN32) && PJ_WIN32 != 0
/* On Win32, for UDP, WSAECONNRESET on the receive side
* indicates that previous sending has triggered ICMP Port
* Unreachable message.
* But we wouldn't know at this point which one of previous
* key that has triggered the error, since UDP socket can
* be shared!
* So we'll just ignore it!
*/
if (rc == PJ_STATUS_FROM_OS(WSAECONNRESET)) {
//PJ_LOG(4,(THIS_FILE,
// "Ignored ICMP port unreach. on key=%p", h));
}
# endif
/* In any case we would report this to caller. */
bytes_read = -rc;
}
/* Unlock; from this point we don't need to hold key's mutex. */
pj_mutex_unlock(h->mutex);
/* Call callback. */
if (h->cb.on_read_complete && !IS_CLOSING(h)) {
(*h->cb.on_read_complete)(h,
(pj_ioqueue_op_key_t*)read_op,
bytes_read);
}
} else {
/*
* This is normal; execution may fall here when multiple threads
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
pj_mutex_unlock(h->mutex);
}
}
void ioqueue_dispatch_exception_event( pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *h )
{
pj_mutex_lock(h->mutex);
if (!h->connecting) {
/* It is possible that more than one thread was woken up, thus
* the remaining thread will see h->connecting as zero because
* it has been processed by other thread.
*/
pj_mutex_unlock(h->mutex);
return;
}
if (IS_CLOSING(h)) {
pj_mutex_unlock(h->mutex);
return;
}
/* Clear operation. */
h->connecting = 0;
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
pj_mutex_unlock(h->mutex);
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h)) {
pj_status_t status = -1;
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
int value;
int vallen = sizeof(value);
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
&value, &vallen);
if (gs_rc == 0) {
status = PJ_RETURN_OS_ERROR(value);
}
#endif
(*h->cb.on_connect_complete)(h, status);
}
}
/*
* pj_ioqueue_recv()
*
* Start asynchronous recv() from the socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recv( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
unsigned flags )
{
struct read_operation *read_op;
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
read_op = (struct read_operation*)op_key;
read_op->op = 0;
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
/* Try to see if there's data immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
pj_status_t status;
pj_ssize_t size;
size = *length;
status = pj_sock_recv(key->fd, buffer, &size, flags);
if (status == PJ_SUCCESS) {
/* Yes! Data is available! */
*length = size;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
return status;
}
}
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
read_op->op = PJ_IOQUEUE_OP_RECV;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
}
/*
* pj_ioqueue_recvfrom()
*
* Start asynchronous recvfrom() from the socket.
*/
PJ_DEF(pj_status_t) pj_ioqueue_recvfrom( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
void *buffer,
pj_ssize_t *length,
unsigned flags,
pj_sockaddr_t *addr,
int *addrlen)
{
struct read_operation *read_op;
PJ_ASSERT_RETURN(key && op_key && buffer && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
read_op = (struct read_operation*)op_key;
read_op->op = 0;
/* Try to see if there's data immediately available.
*/
if ((flags & PJ_IOQUEUE_ALWAYS_ASYNC) == 0) {
pj_status_t status;
pj_ssize_t size;
size = *length;
status = pj_sock_recvfrom(key->fd, buffer, &size, flags,
addr, addrlen);
if (status == PJ_SUCCESS) {
/* Yes! Data is available! */
*length = size;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL))
return status;
}
}
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/*
* No data is immediately available.
* Must schedule asynchronous operation to the ioqueue.
*/
read_op->op = PJ_IOQUEUE_OP_RECV_FROM;
read_op->buf = buffer;
read_op->size = *length;
read_op->flags = flags;
read_op->rmt_addr = addr;
read_op->rmt_addrlen = addrlen;
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->read_list, read_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
}
/*
* pj_ioqueue_send()
*
* Start asynchronous send() to the descriptor.
*/
PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
unsigned flags)
{
struct write_operation *write_op;
pj_status_t status;
unsigned retry;
pj_ssize_t sent;
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
* We are speculating that the list is empty here without properly
* acquiring ioqueue's mutex first. This is intentional, to maximize
* performance via parallelism.
*
* This should be safe, because:
* - by convention, we require caller to make sure that the
* key is not unregistered while other threads are invoking
* an operation on the same key.
* - pj_list_empty() is safe to be invoked by multiple threads,
* even when other threads are modifying the list.
*/
if (pj_list_empty(&key->write_list)) {
/*
* See if data can be sent immediately.
*/
sent = *length;
status = pj_sock_send(key->fd, data, &sent, flags);
if (status == PJ_SUCCESS) {
/* Success! */
*length = sent;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -