📄 bb_boxc.c
字号:
static Boxc *accept_boxc(int fd, int ssl){ Boxc *newconn; Octstr *ip; int newfd; struct sockaddr_in client_addr; socklen_t client_addr_len; client_addr_len = sizeof(client_addr); newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len); if (newfd < 0) return NULL; ip = host_ip(client_addr); if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) { info(0, "Box connection tried from denied host <%s>, disconnected", octstr_get_cstr(ip)); octstr_destroy(ip); close(newfd); return NULL; } newconn = boxc_create(newfd, ip, ssl); /* * check if the SSL handshake was successfull, otherwise * this is no valid box connection any more */#ifdef HAVE_LIBSSL if (ssl && !conn_get_ssl(newconn->conn)) return NULL;#endif if (ssl) info(0, "Client connected from <%s> using SSL", octstr_get_cstr(ip)); else info(0, "Client connected from <%s>", octstr_get_cstr(ip)); /* XXX TODO: do the hand-shake, baby, yeah-yeah! */ return newconn;}static void run_smsbox(void *arg){ int fd; Boxc *newconn; long sender; Msg *msg; List *keys; Octstr *key; list_add_producer(flow_threads); fd = (int)arg; newconn = accept_boxc(fd, smsbox_port_ssl); if (newconn == NULL) { list_remove_producer(flow_threads); return; } newconn->incoming = list_create(); list_add_producer(newconn->incoming); newconn->retry = incoming_sms; newconn->outgoing = outgoing_sms; newconn->sent = dict_create(smsbox_max_pending, NULL); newconn->pending = semaphore_create(smsbox_max_pending); sender = gwthread_create(boxc_sender, newconn); if (sender == -1) { error(0, "Failed to start a new thread, disconnecting client <%s>", octstr_get_cstr(newconn->client_ip)); goto cleanup; } /* * We register newconn in the smsbox_list here but mark newconn as routable * after identification or first message received from smsbox. So we can avoid * a race condition for routable smsboxes (otherwise between startup and * registration we will forward some messages to smsbox). */ gw_rwlock_wrlock(smsbox_list_rwlock); list_append(smsbox_list, newconn); gw_rwlock_unlock(smsbox_list_rwlock); list_add_producer(newconn->outgoing); boxc_receiver(newconn); list_remove_producer(newconn->outgoing); /* remove us from smsbox routing list */ gw_rwlock_wrlock(smsbox_list_rwlock); list_delete_equal(smsbox_list, newconn); if (newconn->boxc_id) { dict_remove(smsbox_by_id, newconn->boxc_id); } gw_rwlock_unlock(smsbox_list_rwlock); /* * check if we in the shutdown phase and sms dequeueing thread * has removed the producer already */ if (list_producer_count(newconn->incoming) > 0) list_remove_producer(newconn->incoming); /* check if we are still waiting for ack's and semaphore locked */ if (dict_key_count(newconn->sent) >= smsbox_max_pending) semaphore_up(newconn->pending); /* allow sender to go down */ gwthread_join(sender); /* put not acked msgs into incoming queue */ keys = dict_keys(newconn->sent); while((key = list_extract_first(keys)) != NULL) { msg = dict_remove(newconn->sent, key); list_produce(incoming_sms, msg); octstr_destroy(key); } gw_assert(list_len(keys) == 0); list_destroy(keys, octstr_destroy_item); /* clear our send queue */ while((msg = list_extract_first(newconn->incoming)) != NULL) { list_produce(incoming_sms, msg); }cleanup: gw_assert(list_len(newconn->incoming) == 0); list_destroy(newconn->incoming, NULL); gw_assert(dict_key_count(newconn->sent) == 0); dict_destroy(newconn->sent); semaphore_destroy(newconn->pending); boxc_destroy(newconn); /* wakeup the dequeueing thread */ gwthread_wakeup(sms_dequeue_thread); list_remove_producer(flow_threads);}static void run_wapbox(void *arg){ int fd; Boxc *newconn; List *newlist; long sender; list_add_producer(flow_threads); fd = (int)arg; newconn = accept_boxc(fd, wapbox_port_ssl); if (newconn == NULL) { list_remove_producer(flow_threads); return; } newconn->is_wap = 1; /* * create a new incoming list for just that box, * and add it to list of list pointers, so we can start * to route messages to it. */ debug("bb", 0, "setting up systems for new wapbox"); newlist = list_create(); list_add_producer(newlist); /* this is released by the sender/receiver if it exits */ newconn->incoming = newlist; newconn->retry = incoming_wdp; newconn->outgoing = outgoing_wdp; sender = gwthread_create(boxc_sender, newconn); if (sender == -1) { error(0, "Failed to start a new thread, disconnecting client <%s>", octstr_get_cstr(newconn->client_ip)); goto cleanup; } list_append(wapbox_list, newconn); list_add_producer(newconn->outgoing); boxc_receiver(newconn); /* cleanup after receiver has exited */ list_remove_producer(newconn->outgoing); list_lock(wapbox_list); list_delete_equal(wapbox_list, newconn); list_unlock(wapbox_list); while (list_producer_count(newlist) > 0) list_remove_producer(newlist); newconn->alive = 0; gwthread_join(sender);cleanup: gw_assert(list_len(newlist) == 0); list_destroy(newlist, NULL); boxc_destroy(newconn); list_remove_producer(flow_threads);}/*------------------------------------------------ * main single thread functions */typedef struct _addrpar { Octstr *address; int port; int wapboxid;} AddrPar;static void ap_destroy(AddrPar *addr){ octstr_destroy(addr->address); gw_free(addr);}static int cmp_route(void *ap, void *ms){ AddrPar *addr = ap; Msg *msg = ms; if (msg->wdp_datagram.source_port == addr->port && octstr_compare(msg->wdp_datagram.source_address, addr->address)==0) return 1; return 0;}static int cmp_boxc(void *bc, void *ap){ Boxc *boxc = bc; AddrPar *addr = ap; if (boxc->id == addr->wapboxid) return 1; return 0;}static Boxc *route_msg(List *route_info, Msg *msg){ AddrPar *ap; Boxc *conn, *best; int i, b, len; ap = list_search(route_info, msg, cmp_route); if (ap == NULL) { debug("bb.boxc", 0, "Did not find previous routing info for WDP, " "generating new");route: if (list_len(wapbox_list) == 0) return NULL; list_lock(wapbox_list); /* take random wapbox from list, and then check all wapboxes * and select the one with lowest load level - if tied, the first * one */ len = list_len(wapbox_list); b = gw_rand() % len; best = list_get(wapbox_list, b); for(i = 0; i < list_len(wapbox_list); i++) { conn = list_get(wapbox_list, (i+b) % len); if (conn != NULL && best != NULL) if (conn->load < best->load) best = conn; } if (best == NULL) { warning(0, "wapbox_list empty!"); list_unlock(wapbox_list); return NULL; } conn = best; conn->load++; /* simulate new client until we get new values */ ap = gw_malloc(sizeof(AddrPar)); ap->address = octstr_duplicate(msg->wdp_datagram.source_address); ap->port = msg->wdp_datagram.source_port; ap->wapboxid = conn->id; list_produce(route_info, ap); list_unlock(wapbox_list); } else conn = list_search(wapbox_list, ap, cmp_boxc); if (conn == NULL) { /* routing failed; wapbox has disappeared! * ..remove routing info and re-route */ debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing"); list_delete_equal(route_info, ap); ap_destroy(ap); goto route; } return conn;}/* * this thread listens to incoming_wdp list * and then routs messages to proper wapbox */static void wdp_to_wapboxes(void *arg){ List *route_info; AddrPar *ap; Boxc *conn; Msg *msg; int i; list_add_producer(flow_threads); list_add_producer(wapbox_list); route_info = list_create(); while(bb_status != BB_DEAD) { list_consume(suspended); /* block here if suspended */ if ((msg = list_consume(incoming_wdp)) == NULL) break; gw_assert(msg_type(msg) == wdp_datagram); conn = route_msg(route_info, msg); if (conn == NULL) { warning(0, "Cannot route message, discard it"); msg_destroy(msg); continue; } list_produce(conn->incoming, msg); } debug("bb", 0, "wdp_to_wapboxes: destroying lists"); while((ap = list_extract_first(route_info)) != NULL) ap_destroy(ap); gw_assert(list_len(route_info) == 0); list_destroy(route_info, NULL); list_lock(wapbox_list); for(i=0; i < list_len(wapbox_list); i++) { conn = list_get(wapbox_list, i); list_remove_producer(conn->incoming); conn->alive = 0; } list_unlock(wapbox_list); list_remove_producer(wapbox_list); list_remove_producer(flow_threads);}static void wait_for_connections(int fd, void (*function) (void *arg), List *waited){ int ret; int timeout = 10; /* 10 sec. */ gw_assert(function != NULL); while(bb_status != BB_DEAD) { /* if we are being shutdowned, as long as there is * messages in incoming list allow new connections, but when * list is empty, exit. * Note: We have timeout (defined above) for which we allow new connections. * Otherwise we wait here for ever! */ if (bb_status == BB_SHUTDOWN) { ret = list_wait_until_nonempty(waited); if (ret == -1 || !timeout) break; else timeout--; } /* block here if suspended */ list_consume(suspended); ret = gwthread_pollfd(fd, POLLIN, 1.0); if (ret > 0) { gwthread_create(function, (void *)fd); gwthread_sleep(1.0); } else if (ret < 0) { if(errno==EINTR) continue; if(errno==EAGAIN) continue; error(errno, "wait_for_connections failed"); } }}static void smsboxc_run(void *arg){ int fd; int port; list_add_producer(flow_threads); gwthread_wakeup(MAIN_THREAD_ID); port = (int)arg; fd = make_server_socket(port, NULL); /* XXX add interface_name if required */ if (fd < 0) { panic(0, "Could not open smsbox port %d", port); } /* * infinitely wait for new connections; * to shut down the system, SIGTERM is send and then * select drops with error, so we can check the status */ wait_for_connections(fd, run_smsbox, incoming_sms); list_remove_producer(smsbox_list); /* continue avalanche */ list_remove_producer(outgoing_sms); /* all connections do the same, so that all must remove() before it * is completely over */ while(list_wait_until_nonempty(smsbox_list) == 1) gwthread_sleep(1.0); /* close listen socket */ close(fd); gwthread_wakeup(sms_dequeue_thread); gwthread_join(sms_dequeue_thread); list_destroy(smsbox_list, NULL); smsbox_list = NULL; gw_rwlock_destroy(smsbox_list_rwlock); smsbox_list_rwlock = NULL; /* destroy things related to smsbox routing */ dict_destroy(smsbox_by_id); smsbox_by_id = NULL; dict_destroy(smsbox_by_smsc); smsbox_by_smsc = NULL; dict_destroy(smsbox_by_receiver); smsbox_by_receiver = NULL; list_remove_producer(flow_threads);}static void wapboxc_run(void *arg){ int fd, port; list_add_producer(flow_threads); gwthread_wakeup(MAIN_THREAD_ID); port = (int)arg; fd = make_server_socket(port, NULL); /* XXX add interface_name if required */ if (fd < 0) { panic(0, "Could not open wapbox port %d", port); } wait_for_connections(fd, run_wapbox, incoming_wdp); /* continue avalanche */ list_remove_producer(outgoing_wdp); /* wait for all connections to die and then remove list */ while(list_wait_until_nonempty(wapbox_list) == 1) gwthread_sleep(1.0); /* wait for wdp_to_wapboxes to exit */ while(list_consume(wapbox_list)!=NULL) ; /* close listen socket */ close(fd); list_destroy(wapbox_list, NULL); wapbox_list = NULL; list_remove_producer(flow_threads);}/* * Populates the corresponding smsbox_by_foobar dictionary hash tables */static void init_smsbox_routes(Cfg *cfg){ CfgGroup *grp; List *list, *items; Octstr *boxc_id, *smsc_ids, *shortcuts; int i; boxc_id = smsc_ids = shortcuts = NULL; list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route")); /* loop multi-group "smsbox-route" */ while (list && (grp = list_extract_first(list)) != NULL) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -