📄 ioqueue_common_abs.c
字号:
}
}
/*
* Schedule asynchronous send.
*/
write_op = (struct write_operation*)op_key;
/* Spin if write_op has pending operation */
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
pj_thread_sleep(0);
/* Last chance */
if (write_op->op) {
/* Unable to send packet because there is already pending write in the
* write_op. We could not put the operation into the write_op
* because write_op already contains a pending operation! And
* we could not send the packet directly with send() either,
* because that will break the order of the packet. So we can
* only return error here.
*
* This could happen for example in multithreads program,
* where polling is done by one thread, while other threads are doing
* the sending only. If the polling thread runs on lower priority
* than the sending thread, then it's possible that the pending
* write flag is not cleared in-time because clearing is only done
* during polling.
*
* Aplication should specify multiple write operation keys on
* situation like this.
*/
//pj_assert(!"ioqueue: there is pending operation on this key!");
return PJ_EBUSY;
}
write_op->op = PJ_IOQUEUE_OP_SEND;
write_op->buf = (void*)data;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
}
/*
* pj_ioqueue_sendto()
*
* Start asynchronous write() to the descriptor.
*/
PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
const void *data,
pj_ssize_t *length,
pj_uint32_t flags,
const pj_sockaddr_t *addr,
int addrlen)
{
struct write_operation *write_op;
unsigned retry;
pj_status_t status;
pj_ssize_t sent;
PJ_ASSERT_RETURN(key && op_key && data && length, PJ_EINVAL);
PJ_CHECK_STACK();
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
/* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */
flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC);
/* Fast track:
* Try to send data immediately, only if there's no pending write!
* Note:
* We are speculating that the list is empty here without properly
* acquiring ioqueue's mutex first. This is intentional, to maximize
* performance via parallelism.
*
* This should be safe, because:
* - by convention, we require caller to make sure that the
* key is not unregistered while other threads are invoking
* an operation on the same key.
* - pj_list_empty() is safe to be invoked by multiple threads,
* even when other threads are modifying the list.
*/
if (pj_list_empty(&key->write_list)) {
/*
* See if data can be sent immediately.
*/
sent = *length;
status = pj_sock_sendto(key->fd, data, &sent, flags, addr, addrlen);
if (status == PJ_SUCCESS) {
/* Success! */
*length = sent;
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
status = status;
}
}
/*
* Check that address storage can hold the address parameter.
*/
PJ_ASSERT_RETURN(addrlen <= sizeof(pj_sockaddr_in), PJ_EBUG);
/*
* Schedule asynchronous send.
*/
write_op = (struct write_operation*)op_key;
/* Spin if write_op has pending operation */
for (retry=0; write_op->op != 0 && retry<PENDING_RETRY; ++retry)
pj_thread_sleep(0);
/* Last chance */
if (write_op->op) {
/* Unable to send packet because there is already pending write on the
* write_op. We could not put the operation into the write_op
* because write_op already contains a pending operation! And
* we could not send the packet directly with sendto() either,
* because that will break the order of the packet. So we can
* only return error here.
*
* This could happen for example in multithreads program,
* where polling is done by one thread, while other threads are doing
* the sending only. If the polling thread runs on lower priority
* than the sending thread, then it's possible that the pending
* write flag is not cleared in-time because clearing is only done
* during polling.
*
* Aplication should specify multiple write operation keys on
* situation like this.
*/
//pj_assert(!"ioqueue: there is pending operation on this key!");
return PJ_EBUSY;
}
write_op->op = PJ_IOQUEUE_OP_SEND_TO;
write_op->buf = (void*)data;
write_op->size = *length;
write_op->written = 0;
write_op->flags = flags;
pj_memcpy(&write_op->rmt_addr, addr, addrlen);
write_op->rmt_addrlen = addrlen;
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->write_list, write_op);
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
}
#if PJ_HAS_TCP
/*
* Initiate overlapped accept() operation.
*/
PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_sock_t *new_sock,
pj_sockaddr_t *local,
pj_sockaddr_t *remote,
int *addrlen)
{
struct accept_operation *accept_op;
pj_status_t status;
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && op_key && new_sock, PJ_EINVAL);
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
accept_op = (struct accept_operation*)op_key;
accept_op->op = 0;
/* Fast track:
* See if there's new connection available immediately.
*/
if (pj_list_empty(&key->accept_list)) {
status = pj_sock_accept(key->fd, new_sock, remote, addrlen);
if (status == PJ_SUCCESS) {
/* Yes! New connection is available! */
if (local && addrlen) {
status = pj_sock_getsockname(*new_sock, local, addrlen);
if (status != PJ_SUCCESS) {
pj_sock_close(*new_sock);
*new_sock = PJ_INVALID_SOCKET;
return status;
}
}
return PJ_SUCCESS;
} else {
/* If error is not EWOULDBLOCK (or EAGAIN on Linux), report
* the error to caller.
*/
if (status != PJ_STATUS_FROM_OS(PJ_BLOCKING_ERROR_VAL)) {
return status;
}
}
}
/*
* No connection is available immediately.
* Schedule accept() operation to be completed when there is incoming
* connection available.
*/
accept_op->op = PJ_IOQUEUE_OP_ACCEPT;
accept_op->accept_fd = new_sock;
accept_op->rmt_addr = remote;
accept_op->addrlen= addrlen;
accept_op->local_addr = local;
pj_mutex_lock(key->mutex);
pj_list_insert_before(&key->accept_list, accept_op);
ioqueue_add_to_set(key->ioqueue, key, READABLE_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
}
/*
* Initiate overlapped connect() operation (well, it's non-blocking actually,
* since there's no overlapped version of connect()).
*/
PJ_DEF(pj_status_t) pj_ioqueue_connect( pj_ioqueue_key_t *key,
const pj_sockaddr_t *addr,
int addrlen )
{
pj_status_t status;
/* check parameters. All must be specified! */
PJ_ASSERT_RETURN(key && addr && addrlen, PJ_EINVAL);
/* Check if key is closing. */
if (IS_CLOSING(key))
return PJ_ECANCELLED;
/* Check if socket has not been marked for connecting */
if (key->connecting != 0)
return PJ_EPENDING;
status = pj_sock_connect(key->fd, addr, addrlen);
if (status == PJ_SUCCESS) {
/* Connected! */
return PJ_SUCCESS;
} else {
if (status == PJ_STATUS_FROM_OS(PJ_BLOCKING_CONNECT_ERROR_VAL)) {
/* Pending! */
pj_mutex_lock(key->mutex);
key->connecting = PJ_TRUE;
ioqueue_add_to_set(key->ioqueue, key, WRITEABLE_EVENT);
ioqueue_add_to_set(key->ioqueue, key, EXCEPTION_EVENT);
pj_mutex_unlock(key->mutex);
return PJ_EPENDING;
} else {
/* Error! */
return status;
}
}
}
#endif /* PJ_HAS_TCP */
PJ_DEF(void) pj_ioqueue_op_key_init( pj_ioqueue_op_key_t *op_key,
pj_size_t size )
{
pj_bzero(op_key, size);
}
/*
* pj_ioqueue_is_pending()
*/
PJ_DEF(pj_bool_t) pj_ioqueue_is_pending( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key )
{
struct generic_operation *op_rec;
PJ_UNUSED_ARG(key);
op_rec = (struct generic_operation*)op_key;
return op_rec->op != 0;
}
/*
* pj_ioqueue_post_completion()
*/
PJ_DEF(pj_status_t) pj_ioqueue_post_completion( pj_ioqueue_key_t *key,
pj_ioqueue_op_key_t *op_key,
pj_ssize_t bytes_status )
{
struct generic_operation *op_rec;
/*
* Find the operation key in all pending operation list to
* really make sure that it's still there; then call the callback.
*/
pj_mutex_lock(key->mutex);
/* Find the operation in the pending read list. */
op_rec = (struct generic_operation*)key->read_list.next;
while (op_rec != (void*)&key->read_list) {
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
pj_mutex_unlock(key->mutex);
(*key->cb.on_read_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
/* Find the operation in the pending write list. */
op_rec = (struct generic_operation*)key->write_list.next;
while (op_rec != (void*)&key->write_list) {
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
pj_mutex_unlock(key->mutex);
(*key->cb.on_write_complete)(key, op_key, bytes_status);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
/* Find the operation in the pending accept list. */
op_rec = (struct generic_operation*)key->accept_list.next;
while (op_rec != (void*)&key->accept_list) {
if (op_rec == (void*)op_key) {
pj_list_erase(op_rec);
op_rec->op = 0;
pj_mutex_unlock(key->mutex);
(*key->cb.on_accept_complete)(key, op_key,
PJ_INVALID_SOCKET,
bytes_status);
return PJ_SUCCESS;
}
op_rec = op_rec->next;
}
pj_mutex_unlock(key->mutex);
return PJ_EINVALIDOP;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -