📄 ioqueue_common_abs.c
字号:
/* $Id: ioqueue_common_abs.c 1158 2007-04-06 10:25:23Z bennylp $ */
/*
* Copyright (C)2003-2007 Benny Prijono <benny@prijono.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
* ioqueue_common_abs.c
*
* This contains common functionalities to emulate proactor pattern with
* various event dispatching mechanisms (e.g. select, epoll).
*
* This file will be included by the appropriate ioqueue implementation.
* This file is NOT supposed to be compiled as stand-alone source.
*/
#define PENDING_RETRY 2
static void ioqueue_init( pj_ioqueue_t *ioqueue )
{
ioqueue->lock = NULL;
ioqueue->auto_delete_lock = 0;
}
static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue)
{
if (ioqueue->auto_delete_lock && ioqueue->lock ) {
pj_lock_release(ioqueue->lock);
return pj_lock_destroy(ioqueue->lock);
} else
return PJ_SUCCESS;
}
/*
* pj_ioqueue_set_lock()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue,
pj_lock_t *lock,
pj_bool_t auto_delete )
{
PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL);
if (ioqueue->auto_delete_lock && ioqueue->lock) {
pj_lock_destroy(ioqueue->lock);
}
ioqueue->lock = lock;
ioqueue->auto_delete_lock = auto_delete;
return PJ_SUCCESS;
}
static pj_status_t ioqueue_init_key( pj_pool_t *pool,
pj_ioqueue_t *ioqueue,
pj_ioqueue_key_t *key,
pj_sock_t sock,
void *user_data,
const pj_ioqueue_callback *cb)
{
pj_status_t rc;
int optlen;
PJ_UNUSED_ARG(pool);
key->ioqueue = ioqueue;
key->fd = sock;
key->user_data = user_data;
pj_list_init(&key->read_list);
pj_list_init(&key->write_list);
#if PJ_HAS_TCP
pj_list_init(&key->accept_list);
key->connecting = 0;
#endif
/* Save callback. */
pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));
#if PJ_IOQUEUE_HAS_SAFE_UNREG
/* Set initial reference count to 1 */
pj_assert(key->ref_count == 0);
++key->ref_count;
key->closing = 0;
#endif
/* Get socket type. When socket type is datagram, some optimization
* will be performed during send to allow parallel send operations.
*/
optlen = sizeof(key->fd_type);
rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE,
&key->fd_type, &optlen);
if (rc != PJ_SUCCESS)
key->fd_type = PJ_SOCK_STREAM;
/* Create mutex for the key. */
#if !PJ_IOQUEUE_HAS_SAFE_UNREG
rc = pj_mutex_create_simple(pool, NULL, &key->mutex);
#endif
return rc;
}
/*
* pj_ioqueue_get_user_data()
*
* Obtain value associated with a key.
*/
PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key )
{
PJ_ASSERT_RETURN(key != NULL, NULL);
return key->user_data;
}
/*
* pj_ioqueue_set_user_data()
*/
PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key,
void *user_data,
void **old_data)
{
PJ_ASSERT_RETURN(key, PJ_EINVAL);
if (old_data)
*old_data = key->user_data;
key->user_data = user_data;
return PJ_SUCCESS;
}
PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key)
{
return !pj_list_empty(&key->write_list);
}
PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key)
{
return !pj_list_empty(&key->read_list);
}
PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key)
{
#if PJ_HAS_TCP
return !pj_list_empty(&key->accept_list);
#else
PJ_UNUSED_ARG(key);
return 0;
#endif
}
PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key)
{
return key->connecting;
}
#if PJ_IOQUEUE_HAS_SAFE_UNREG
# define IS_CLOSING(key) (key->closing)
#else
# define IS_CLOSING(key) (0)
#endif
/*
* ioqueue_dispatch_event()
*
* Report occurence of an event in the key to be processed by the
* framework.
*/
void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h)
{
/* Lock the key. */
pj_mutex_lock(h->mutex);
if (IS_CLOSING(h)) {
pj_mutex_unlock(h->mutex);
return;
}
#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
if (h->connecting) {
/* Completion of connect() operation */
pj_ssize_t bytes_transfered;
/* Clear operation. */
h->connecting = 0;
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);
#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0)
/* from connect(2):
* On Linux, use getsockopt to read the SO_ERROR option at
* level SOL_SOCKET to determine whether connect() completed
* successfully (if SO_ERROR is zero).
*/
{
int value;
int vallen = sizeof(value);
int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR,
&value, &vallen);
if (gs_rc != 0) {
/* Argh!! What to do now???
* Just indicate that the socket is connected. The
* application will get error as soon as it tries to use
* the socket to send/receive.
*/
bytes_transfered = 0;
} else {
bytes_transfered = value;
}
}
#elif defined(PJ_WIN32) && PJ_WIN32!=0
bytes_transfered = 0; /* success */
#else
/* Excellent information in D.J. Bernstein page:
* http://cr.yp.to/docs/connect.html
*
* Seems like the most portable way of detecting connect()
* failure is to call getpeername(). If socket is connected,
* getpeername() will return 0. If the socket is not connected,
* it will return ENOTCONN, and read(fd, &ch, 1) will produce
* the right errno through error slippage. This is a combination
* of suggestions from Douglas C. Schmidt and Ken Keys.
*/
{
int gp_rc;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen);
bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc;
}
#endif
/* Unlock; from this point we don't need to hold key's mutex. */
pj_mutex_unlock(h->mutex);
/* Call callback. */
if (h->cb.on_connect_complete && !IS_CLOSING(h))
(*h->cb.on_connect_complete)(h, bytes_transfered);
/* Done. */
} else
#endif /* PJ_HAS_TCP */
if (key_has_pending_write(h)) {
/* Socket is writable. */
struct write_operation *write_op;
pj_ssize_t sent;
pj_status_t send_rc;
/* Get the first in the queue. */
write_op = h->write_list.next;
/* For datagrams, we can remove the write_op from the list
* so that send() can work in parallel.
*/
if (h->fd_type == PJ_SOCK_DGRAM) {
pj_list_erase(write_op);
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
/* Send the data.
* Unfortunately we must do this while holding key's mutex, thus
* preventing parallel write on a single key.. :-((
*/
sent = write_op->size - write_op->written;
if (write_op->op == PJ_IOQUEUE_OP_SEND) {
send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written,
&sent, write_op->flags);
/* Can't do this. We only clear "op" after we're finished sending
* the whole buffer.
*/
//write_op->op = 0;
} else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) {
send_rc = pj_sock_sendto(h->fd,
write_op->buf+write_op->written,
&sent, write_op->flags,
&write_op->rmt_addr,
write_op->rmt_addrlen);
/* Can't do this. We only clear "op" after we're finished sending
* the whole buffer.
*/
//write_op->op = 0;
} else {
pj_assert(!"Invalid operation type!");
write_op->op = 0;
send_rc = PJ_EBUG;
}
if (send_rc == PJ_SUCCESS) {
write_op->written += sent;
} else {
pj_assert(send_rc > 0);
write_op->written = -send_rc;
}
/* Are we finished with this buffer? */
if (send_rc!=PJ_SUCCESS ||
write_op->written == (pj_ssize_t)write_op->size ||
h->fd_type == PJ_SOCK_DGRAM)
{
write_op->op = 0;
if (h->fd_type != PJ_SOCK_DGRAM) {
/* Write completion of the whole stream. */
pj_list_erase(write_op);
/* Clear operation if there's no more data to send. */
if (pj_list_empty(&h->write_list))
ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT);
}
/* No need to hold mutex anymore */
pj_mutex_unlock(h->mutex);
/* Call callback. */
if (h->cb.on_write_complete && !IS_CLOSING(h)) {
(*h->cb.on_write_complete)(h,
(pj_ioqueue_op_key_t*)write_op,
write_op->written);
}
} else {
pj_mutex_unlock(h->mutex);
}
/* Done. */
} else {
/*
* This is normal; execution may fall here when multiple threads
* are signalled for the same event, but only one thread eventually
* able to process the event.
*/
pj_mutex_unlock(h->mutex);
}
}
void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h )
{
pj_status_t rc;
/* Lock the key. */
pj_mutex_lock(h->mutex);
if (IS_CLOSING(h)) {
pj_mutex_unlock(h->mutex);
return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -