⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 bb_boxc.c

📁 The Kannel Open Source WAP and SMS gateway works as both an SMS gateway, for implementing keyword b
💻 C
📖 第 1 页 / 共 3 页
字号:
static Boxc *accept_boxc(int fd, int ssl){    Boxc *newconn;    Octstr *ip;    int newfd;    struct sockaddr_in client_addr;    socklen_t client_addr_len;    client_addr_len = sizeof(client_addr);    newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);    if (newfd < 0)	    return NULL;    ip = host_ip(client_addr);    if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) {        info(0, "Box connection tried from denied host <%s>, disconnected",                octstr_get_cstr(ip));        octstr_destroy(ip);        close(newfd);        return NULL;    }    newconn = boxc_create(newfd, ip, ssl);    /*     * check if the SSL handshake was successfull, otherwise     * this is no valid box connection any more     */#ifdef HAVE_LIBSSL     if (ssl && !conn_get_ssl(newconn->conn))        return NULL;#endif    if (ssl)        info(0, "Client connected from <%s> using SSL", octstr_get_cstr(ip));    else        info(0, "Client connected from <%s>", octstr_get_cstr(ip));    /* XXX TODO: do the hand-shake, baby, yeah-yeah! */    return newconn;}static void run_smsbox(void *arg){    int fd;    Boxc *newconn;    long sender;    Msg *msg;    List *keys;    Octstr *key;    list_add_producer(flow_threads);    fd = (int)arg;    newconn = accept_boxc(fd, smsbox_port_ssl);    if (newconn == NULL) {	    list_remove_producer(flow_threads);	    return;    }    newconn->incoming = list_create();    list_add_producer(newconn->incoming);    newconn->retry = incoming_sms;    newconn->outgoing = outgoing_sms;    newconn->sent = dict_create(smsbox_max_pending, NULL);    newconn->pending = semaphore_create(smsbox_max_pending);    sender = gwthread_create(boxc_sender, newconn);    if (sender == -1) {	    error(0, "Failed to start a new thread, disconnecting client <%s>",	          octstr_get_cstr(newconn->client_ip));	    goto cleanup;    }    /*     * We register newconn in the smsbox_list here but mark newconn as routable     * after identification or first message received from smsbox. So we can avoid     * a race condition for routable smsboxes (otherwise between startup and     * registration we will forward some messages to smsbox).     */    gw_rwlock_wrlock(smsbox_list_rwlock);    list_append(smsbox_list, newconn);    gw_rwlock_unlock(smsbox_list_rwlock);    list_add_producer(newconn->outgoing);    boxc_receiver(newconn);    list_remove_producer(newconn->outgoing);    /* remove us from smsbox routing list */    gw_rwlock_wrlock(smsbox_list_rwlock);    list_delete_equal(smsbox_list, newconn);    if (newconn->boxc_id) {        dict_remove(smsbox_by_id, newconn->boxc_id);    }    gw_rwlock_unlock(smsbox_list_rwlock);    /*     * check if we in the shutdown phase and sms dequeueing thread     *   has removed the producer already     */    if (list_producer_count(newconn->incoming) > 0)        list_remove_producer(newconn->incoming);    /* check if we are still waiting for ack's and semaphore locked */    if (dict_key_count(newconn->sent) >= smsbox_max_pending)        semaphore_up(newconn->pending); /* allow sender to go down */            gwthread_join(sender);    /* put not acked msgs into incoming queue */        keys = dict_keys(newconn->sent);    while((key = list_extract_first(keys)) != NULL) {        msg = dict_remove(newconn->sent, key);        list_produce(incoming_sms, msg);        octstr_destroy(key);    }    gw_assert(list_len(keys) == 0);    list_destroy(keys, octstr_destroy_item);    /* clear our send queue */    while((msg = list_extract_first(newconn->incoming)) != NULL) {        list_produce(incoming_sms, msg);    }cleanup:    gw_assert(list_len(newconn->incoming) == 0);    list_destroy(newconn->incoming, NULL);    gw_assert(dict_key_count(newconn->sent) == 0);    dict_destroy(newconn->sent);    semaphore_destroy(newconn->pending);    boxc_destroy(newconn);    /* wakeup the dequeueing thread */    gwthread_wakeup(sms_dequeue_thread);    list_remove_producer(flow_threads);}static void run_wapbox(void *arg){    int fd;    Boxc *newconn;    List *newlist;    long sender;    list_add_producer(flow_threads);    fd = (int)arg;    newconn = accept_boxc(fd, wapbox_port_ssl);    if (newconn == NULL) {	    list_remove_producer(flow_threads);	    return;    }    newconn->is_wap = 1;        /*     * create a new incoming list for just that box,     * and add it to list of list pointers, so we can start     * to route messages to it.     */    debug("bb", 0, "setting up systems for new wapbox");        newlist = list_create();    list_add_producer(newlist);  /* this is released by the     	    	    	    	    sender/receiver if it exits */        newconn->incoming = newlist;    newconn->retry = incoming_wdp;    newconn->outgoing = outgoing_wdp;    sender = gwthread_create(boxc_sender, newconn);    if (sender == -1) {	    error(0, "Failed to start a new thread, disconnecting client <%s>",	          octstr_get_cstr(newconn->client_ip));	    goto cleanup;    }    list_append(wapbox_list, newconn);    list_add_producer(newconn->outgoing);    boxc_receiver(newconn);    /* cleanup after receiver has exited */        list_remove_producer(newconn->outgoing);    list_lock(wapbox_list);    list_delete_equal(wapbox_list, newconn);    list_unlock(wapbox_list);    while (list_producer_count(newlist) > 0)	    list_remove_producer(newlist);    newconn->alive = 0;        gwthread_join(sender);cleanup:    gw_assert(list_len(newlist) == 0);    list_destroy(newlist, NULL);    boxc_destroy(newconn);    list_remove_producer(flow_threads);}/*------------------------------------------------ * main single thread functions */typedef struct _addrpar {    Octstr *address;    int	port;    int wapboxid;} AddrPar;static void ap_destroy(AddrPar *addr){    octstr_destroy(addr->address);    gw_free(addr);}static int cmp_route(void *ap, void *ms){    AddrPar *addr = ap;    Msg *msg = ms;        if (msg->wdp_datagram.source_port == addr->port  &&	    octstr_compare(msg->wdp_datagram.source_address, addr->address)==0)	return 1;    return 0;}static int cmp_boxc(void *bc, void *ap){    Boxc *boxc = bc;    AddrPar *addr = ap;    if (boxc->id == addr->wapboxid) return 1;        return 0;}static Boxc *route_msg(List *route_info, Msg *msg){    AddrPar *ap;    Boxc *conn, *best;    int i, b, len;        ap = list_search(route_info, msg, cmp_route);    if (ap == NULL) {	    debug("bb.boxc", 0, "Did not find previous routing info for WDP, "	    	  "generating new");route:	    if (list_len(wapbox_list) == 0)	        return NULL;	    list_lock(wapbox_list);	/* take random wapbox from list, and then check all wapboxes	 * and select the one with lowest load level - if tied, the first	 * one	 */	    len = list_len(wapbox_list);	    b = gw_rand() % len;	    best = list_get(wapbox_list, b);	    for(i = 0; i < list_len(wapbox_list); i++) {	        conn = list_get(wapbox_list, (i+b) % len);	        if (conn != NULL && best != NULL)		        if (conn->load < best->load)		            best = conn;	    }	    if (best == NULL) {	        warning(0, "wapbox_list empty!");	        list_unlock(wapbox_list);	        return NULL;	    }	    conn = best;	    conn->load++;	/* simulate new client until we get new values */		    ap = gw_malloc(sizeof(AddrPar));	    ap->address = octstr_duplicate(msg->wdp_datagram.source_address);	    ap->port = msg->wdp_datagram.source_port;	    ap->wapboxid = conn->id;	    list_produce(route_info, ap);	    list_unlock(wapbox_list);    } else	    conn = list_search(wapbox_list, ap, cmp_boxc);    if (conn == NULL) {	/* routing failed; wapbox has disappeared!	 * ..remove routing info and re-route   */	    debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing");	    list_delete_equal(route_info, ap);	    ap_destroy(ap);	    goto route;    }    return conn;}/* * this thread listens to incoming_wdp list * and then routs messages to proper wapbox */static void wdp_to_wapboxes(void *arg){    List *route_info;    AddrPar *ap;    Boxc *conn;    Msg *msg;    int i;    list_add_producer(flow_threads);    list_add_producer(wapbox_list);    route_info = list_create();        while(bb_status != BB_DEAD) {	    list_consume(suspended);	/* block here if suspended */	    if ((msg = list_consume(incoming_wdp)) == NULL)	         break;	    gw_assert(msg_type(msg) == wdp_datagram);	    conn = route_msg(route_info, msg);	    if (conn == NULL) {	        warning(0, "Cannot route message, discard it");	        msg_destroy(msg);	        continue;	    }	    list_produce(conn->incoming, msg);    }    debug("bb", 0, "wdp_to_wapboxes: destroying lists");    while((ap = list_extract_first(route_info)) != NULL)	ap_destroy(ap);    gw_assert(list_len(route_info) == 0);    list_destroy(route_info, NULL);    list_lock(wapbox_list);    for(i=0; i < list_len(wapbox_list); i++) {	    conn = list_get(wapbox_list, i);	    list_remove_producer(conn->incoming);	    conn->alive = 0;    }    list_unlock(wapbox_list);    list_remove_producer(wapbox_list);    list_remove_producer(flow_threads);}static void wait_for_connections(int fd, void (*function) (void *arg),     	    	    	    	 List *waited){    int ret;    int timeout = 10; /* 10 sec. */    gw_assert(function != NULL);        while(bb_status != BB_DEAD) {	/* if we are being shutdowned, as long as there is	 * messages in incoming list allow new connections, but when	 * list is empty, exit.         * Note: We have timeout (defined above) for which we allow new connections.         *           Otherwise we wait here for ever!	 */	    if (bb_status == BB_SHUTDOWN) {	        ret = list_wait_until_nonempty(waited);	        if (ret == -1 || !timeout)                    break;                else                    timeout--;	    }            /* block here if suspended */            list_consume(suspended);            ret = gwthread_pollfd(fd, POLLIN, 1.0);	    if (ret > 0) {	        gwthread_create(function, (void *)fd);	        gwthread_sleep(1.0);	    } else if (ret < 0) {	        if(errno==EINTR) continue;	        if(errno==EAGAIN) continue;	        error(errno, "wait_for_connections failed");	    }    }}static void smsboxc_run(void *arg){    int fd;    int port;    list_add_producer(flow_threads);    gwthread_wakeup(MAIN_THREAD_ID);    port = (int)arg;        fd = make_server_socket(port, NULL);     	/* XXX add interface_name if required */    if (fd < 0) {	    panic(0, "Could not open smsbox port %d", port);    }    /*     * infinitely wait for new connections;     * to shut down the system, SIGTERM is send and then     * select drops with error, so we can check the status     */    wait_for_connections(fd, run_smsbox, incoming_sms);    list_remove_producer(smsbox_list);    /* continue avalanche */    list_remove_producer(outgoing_sms);    /* all connections do the same, so that all must remove() before it     * is completely over     */    while(list_wait_until_nonempty(smsbox_list) == 1)        gwthread_sleep(1.0);    /* close listen socket */    close(fd);    gwthread_wakeup(sms_dequeue_thread);    gwthread_join(sms_dequeue_thread);    list_destroy(smsbox_list, NULL);    smsbox_list = NULL;    gw_rwlock_destroy(smsbox_list_rwlock);    smsbox_list_rwlock = NULL;    /* destroy things related to smsbox routing */    dict_destroy(smsbox_by_id);    smsbox_by_id = NULL;    dict_destroy(smsbox_by_smsc);    smsbox_by_smsc = NULL;    dict_destroy(smsbox_by_receiver);    smsbox_by_receiver = NULL;        list_remove_producer(flow_threads);}static void wapboxc_run(void *arg){    int fd, port;    list_add_producer(flow_threads);    gwthread_wakeup(MAIN_THREAD_ID);    port = (int)arg;        fd = make_server_socket(port, NULL);    	/* XXX add interface_name if required */    if (fd < 0) {	    panic(0, "Could not open wapbox port %d", port);    }    wait_for_connections(fd, run_wapbox, incoming_wdp);    /* continue avalanche */    list_remove_producer(outgoing_wdp);    /* wait for all connections to die and then remove list     */        while(list_wait_until_nonempty(wapbox_list) == 1)        gwthread_sleep(1.0);    /* wait for wdp_to_wapboxes to exit */    while(list_consume(wapbox_list)!=NULL)	;        /* close listen socket */    close(fd);        list_destroy(wapbox_list, NULL);    wapbox_list = NULL;        list_remove_producer(flow_threads);}/* * Populates the corresponding smsbox_by_foobar dictionary hash tables */static void init_smsbox_routes(Cfg *cfg){    CfgGroup *grp;    List *list, *items;    Octstr *boxc_id, *smsc_ids, *shortcuts;    int i;    boxc_id = smsc_ids = shortcuts = NULL;    list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route"));      /* loop multi-group "smsbox-route" */    while (list && (grp = list_extract_first(list)) != NULL) { 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -