📄 ioqueue_common_abs.c
字号:
/* $Id: ioqueue_common_abs.c 1158 2007-04-06 10:25:23Z bennylp $ *//* * Copyright (C)2003-2007 Benny Prijono <benny@prijono.org> * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//* * ioqueue_common_abs.c * * This contains common functionalities to emulate proactor pattern with * various event dispatching mechanisms (e.g. select, epoll). * * This file will be included by the appropriate ioqueue implementation. * This file is NOT supposed to be compiled as stand-alone source. */#define PENDING_RETRY 2static void ioqueue_init( pj_ioqueue_t *ioqueue ){ ioqueue->lock = NULL; ioqueue->auto_delete_lock = 0;}static pj_status_t ioqueue_destroy(pj_ioqueue_t *ioqueue){ if (ioqueue->auto_delete_lock && ioqueue->lock ) { pj_lock_release(ioqueue->lock); return pj_lock_destroy(ioqueue->lock); } else return PJ_SUCCESS;}/* * pj_ioqueue_set_lock() */PJ_DEF(pj_status_t) pj_ioqueue_set_lock( pj_ioqueue_t *ioqueue, pj_lock_t *lock, pj_bool_t auto_delete ){ PJ_ASSERT_RETURN(ioqueue && lock, PJ_EINVAL); if (ioqueue->auto_delete_lock && ioqueue->lock) { pj_lock_destroy(ioqueue->lock); } ioqueue->lock = lock; ioqueue->auto_delete_lock = auto_delete; return PJ_SUCCESS;}static pj_status_t ioqueue_init_key( pj_pool_t *pool, pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *key, pj_sock_t sock, void *user_data, const pj_ioqueue_callback *cb){ pj_status_t rc; int optlen; PJ_UNUSED_ARG(pool); key->ioqueue = ioqueue; key->fd = sock; key->user_data = user_data; pj_list_init(&key->read_list); pj_list_init(&key->write_list);#if PJ_HAS_TCP pj_list_init(&key->accept_list); key->connecting = 0;#endif /* Save callback. */ pj_memcpy(&key->cb, cb, sizeof(pj_ioqueue_callback));#if PJ_IOQUEUE_HAS_SAFE_UNREG /* Set initial reference count to 1 */ pj_assert(key->ref_count == 0); ++key->ref_count; key->closing = 0;#endif /* Get socket type. When socket type is datagram, some optimization * will be performed during send to allow parallel send operations. */ optlen = sizeof(key->fd_type); rc = pj_sock_getsockopt(sock, PJ_SOL_SOCKET, PJ_SO_TYPE, &key->fd_type, &optlen); if (rc != PJ_SUCCESS) key->fd_type = PJ_SOCK_STREAM; /* Create mutex for the key. */#if !PJ_IOQUEUE_HAS_SAFE_UNREG rc = pj_mutex_create_simple(pool, NULL, &key->mutex);#endif return rc;}/* * pj_ioqueue_get_user_data() * * Obtain value associated with a key. */PJ_DEF(void*) pj_ioqueue_get_user_data( pj_ioqueue_key_t *key ){ PJ_ASSERT_RETURN(key != NULL, NULL); return key->user_data;}/* * pj_ioqueue_set_user_data() */PJ_DEF(pj_status_t) pj_ioqueue_set_user_data( pj_ioqueue_key_t *key, void *user_data, void **old_data){ PJ_ASSERT_RETURN(key, PJ_EINVAL); if (old_data) *old_data = key->user_data; key->user_data = user_data; return PJ_SUCCESS;}PJ_INLINE(int) key_has_pending_write(pj_ioqueue_key_t *key){ return !pj_list_empty(&key->write_list);}PJ_INLINE(int) key_has_pending_read(pj_ioqueue_key_t *key){ return !pj_list_empty(&key->read_list);}PJ_INLINE(int) key_has_pending_accept(pj_ioqueue_key_t *key){#if PJ_HAS_TCP return !pj_list_empty(&key->accept_list);#else PJ_UNUSED_ARG(key); return 0;#endif}PJ_INLINE(int) key_has_pending_connect(pj_ioqueue_key_t *key){ return key->connecting;}#if PJ_IOQUEUE_HAS_SAFE_UNREG# define IS_CLOSING(key) (key->closing)#else# define IS_CLOSING(key) (0)#endif/* * ioqueue_dispatch_event() * * Report occurence of an event in the key to be processed by the * framework. */void ioqueue_dispatch_write_event(pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h){ /* Lock the key. */ pj_mutex_lock(h->mutex); if (IS_CLOSING(h)) { pj_mutex_unlock(h->mutex); return; }#if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0 if (h->connecting) { /* Completion of connect() operation */ pj_ssize_t bytes_transfered; /* Clear operation. */ h->connecting = 0; ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); ioqueue_remove_from_set(ioqueue, h, EXCEPTION_EVENT);#if (defined(PJ_HAS_SO_ERROR) && PJ_HAS_SO_ERROR!=0) /* from connect(2): * On Linux, use getsockopt to read the SO_ERROR option at * level SOL_SOCKET to determine whether connect() completed * successfully (if SO_ERROR is zero). */ { int value; int vallen = sizeof(value); int gs_rc = pj_sock_getsockopt(h->fd, SOL_SOCKET, SO_ERROR, &value, &vallen); if (gs_rc != 0) { /* Argh!! What to do now??? * Just indicate that the socket is connected. The * application will get error as soon as it tries to use * the socket to send/receive. */ bytes_transfered = 0; } else { bytes_transfered = value; } }#elif defined(PJ_WIN32) && PJ_WIN32!=0 bytes_transfered = 0; /* success */#else /* Excellent information in D.J. Bernstein page: * http://cr.yp.to/docs/connect.html * * Seems like the most portable way of detecting connect() * failure is to call getpeername(). If socket is connected, * getpeername() will return 0. If the socket is not connected, * it will return ENOTCONN, and read(fd, &ch, 1) will produce * the right errno through error slippage. This is a combination * of suggestions from Douglas C. Schmidt and Ken Keys. */ { int gp_rc; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); gp_rc = getpeername(h->fd, (struct sockaddr*)&addr, &addrlen); bytes_transfered = (gp_rc < 0) ? gp_rc : -gp_rc; }#endif /* Unlock; from this point we don't need to hold key's mutex. */ pj_mutex_unlock(h->mutex); /* Call callback. */ if (h->cb.on_connect_complete && !IS_CLOSING(h)) (*h->cb.on_connect_complete)(h, bytes_transfered); /* Done. */ } else #endif /* PJ_HAS_TCP */ if (key_has_pending_write(h)) { /* Socket is writable. */ struct write_operation *write_op; pj_ssize_t sent; pj_status_t send_rc; /* Get the first in the queue. */ write_op = h->write_list.next; /* For datagrams, we can remove the write_op from the list * so that send() can work in parallel. */ if (h->fd_type == PJ_SOCK_DGRAM) { pj_list_erase(write_op); if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); } /* Send the data. * Unfortunately we must do this while holding key's mutex, thus * preventing parallel write on a single key.. :-(( */ sent = write_op->size - write_op->written; if (write_op->op == PJ_IOQUEUE_OP_SEND) { send_rc = pj_sock_send(h->fd, write_op->buf+write_op->written, &sent, write_op->flags); /* Can't do this. We only clear "op" after we're finished sending * the whole buffer. */ //write_op->op = 0; } else if (write_op->op == PJ_IOQUEUE_OP_SEND_TO) { send_rc = pj_sock_sendto(h->fd, write_op->buf+write_op->written, &sent, write_op->flags, &write_op->rmt_addr, write_op->rmt_addrlen); /* Can't do this. We only clear "op" after we're finished sending * the whole buffer. */ //write_op->op = 0; } else { pj_assert(!"Invalid operation type!"); write_op->op = 0; send_rc = PJ_EBUG; } if (send_rc == PJ_SUCCESS) { write_op->written += sent; } else { pj_assert(send_rc > 0); write_op->written = -send_rc; } /* Are we finished with this buffer? */ if (send_rc!=PJ_SUCCESS || write_op->written == (pj_ssize_t)write_op->size || h->fd_type == PJ_SOCK_DGRAM) { write_op->op = 0; if (h->fd_type != PJ_SOCK_DGRAM) { /* Write completion of the whole stream. */ pj_list_erase(write_op); /* Clear operation if there's no more data to send. */ if (pj_list_empty(&h->write_list)) ioqueue_remove_from_set(ioqueue, h, WRITEABLE_EVENT); } /* No need to hold mutex anymore */ pj_mutex_unlock(h->mutex); /* Call callback. */ if (h->cb.on_write_complete && !IS_CLOSING(h)) { (*h->cb.on_write_complete)(h, (pj_ioqueue_op_key_t*)write_op, write_op->written); } } else { pj_mutex_unlock(h->mutex); } /* Done. */ } else { /* * This is normal; execution may fall here when multiple threads * are signalled for the same event, but only one thread eventually * able to process the event. */ pj_mutex_unlock(h->mutex); }}void ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ){ pj_status_t rc; /* Lock the key. */ pj_mutex_lock(h->mutex); if (IS_CLOSING(h)) { pj_mutex_unlock(h->mutex); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -