📄 smsc_cgw.c
字号:
host = cfg_get(cfg, octstr_imm("host")); appname = cfg_get(cfg, octstr_imm("appname")); if (cfg_get_integer(&our_port, cfg, octstr_imm("our-port")) == -1) privdata->our_port = 0; /* 0 means use any port */ else privdata->our_port = our_port; allow_ip = cfg_get(cfg, octstr_imm("connect-allow-ip")); if (allow_ip) deny_ip = octstr_create("*.*.*.*"); else deny_ip = NULL; if (cfg_get_integer(&waitack, cfg, octstr_imm("wait-ack")) < 0) privdata->waitack = 60; else privdata->waitack = waitack; if (privdata->port <= 0 || privdata->port > 65535) { info(1, "No port defined for cgw -> using default (%d)", CGW_DEFPORT); privdata->port = CGW_DEFPORT; } if (host == NULL) { error(0, "'host' missing in cgw configuration."); goto error; } if (appname == NULL) appname = octstr_create("send"); privdata->allow_ip = allow_ip; privdata->deny_ip = deny_ip; privdata->host = host; privdata->appname = appname; privdata->nexttrn = 0; privdata->check_time = 0; for (i = 0; i < CGW_TRN_MAX; i++) { privdata->sendtime[i] = 0; privdata->dlr[i] = 0; } if (privdata->rport > 0 && cgw_open_listening_socket(privdata) < 0) { gw_free(privdata); privdata = NULL; goto error; } conn->data = privdata; conn->name = octstr_format("CGW:%d", privdata->port); privdata->shutdown = 0; conn->status = SMSCCONN_CONNECTING; conn->connect_time = time(NULL); if (privdata->rport > 0 && (privdata->receiver_thread = gwthread_create(cgw_listener, conn)) == -1) goto error; if ((privdata->sender_thread = gwthread_create(cgw_sender, conn)) == -1) { privdata->shutdown = 1; goto error; } conn->shutdown = cgw_shutdown_cb; conn->queued = cgw_queued_cb; conn->start_conn = cgw_start_cb; conn->send_msg = cgw_add_msg_cb; return 0;error: error(0, "Failed to create CGW smsc connection"); if (privdata != NULL) list_destroy(privdata->outgoing_queue, NULL); gw_free(privdata); octstr_destroy(host); octstr_destroy(allow_ip); octstr_destroy(deny_ip); octstr_destroy(appname); conn->why_killed = SMSCCONN_KILLED_CANNOT_CONNECT; conn->status = SMSCCONN_DEAD; info(0, "exiting"); return -1;}/****************************************************************************** * Callbacks for startup, shutdown, incoming and outgoing messages */static int cgw_add_msg_cb(SMSCConn *conn, Msg *sms){ PrivData *privdata = conn->data; Msg *copy; copy = msg_duplicate(sms); list_produce(privdata->outgoing_queue, copy); gwthread_wakeup(privdata->sender_thread); return 0;}static int cgw_shutdown_cb(SMSCConn *conn, int finish_sending){ PrivData *privdata = conn->data; debug("bb.sms", 0, "Shutting down SMSCConn CGW, %s", finish_sending ? "slow" : "instant"); /* Documentation claims this would have been done by smscconn.c, but isn't when this code is being written. */ conn->why_killed = SMSCCONN_KILLED_SHUTDOWN; privdata->shutdown = 1; /* Separate from why_killed to avoid locking, as * why_killed may be changed from outside? */ if (finish_sending == 0) { Msg *msg; while ((msg = list_extract_first(privdata->outgoing_queue)) != NULL) { bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN); } } if (privdata->rport > 0) gwthread_wakeup(privdata->receiver_thread); return 0;}static void cgw_start_cb(SMSCConn *conn){ PrivData *privdata = conn->data; /* in case there are messages in the buffer already */ if (privdata->rport > 0) gwthread_wakeup(privdata->receiver_thread); debug("smsc.cgw", 0, "smsc_cgw: start called");}static long cgw_queued_cb(SMSCConn *conn){ PrivData *privdata = conn->data; long ret = list_len(privdata->outgoing_queue); /* use internal queue as load, maybe something else later */ conn->load = ret; return ret;}/****************************************************************************** * This is the entry point for out sender thread. This function is responsible * for sending and acking messages in queue */static void cgw_sender(void *arg){ SMSCConn *conn = arg; PrivData *privdata = conn->data; Msg *msg = NULL; Connection *server = NULL; int l = 0; int ret = 0; conn->status = SMSCCONN_CONNECTING; while (!privdata->shutdown) { // check that connection is active if (conn->status != SMSCCONN_ACTIVE) { if ((server = cgw_open_send_connection(conn)) == NULL) { privdata->shutdown = 1; error(0, "Unable to connect to CGW server"); return ; } conn->status = SMSCCONN_ACTIVE; bb_smscconn_connected(conn); } else { ret = 0; l = list_len(privdata->outgoing_queue); if (l > 0) ret = cgw_send_loop(conn, server); /* send any messages in queue */ if (ret != -1) ret = cgw_wait_command(privdata, conn, server, 1); /* read ack's and delivery reports */ if (ret != -1) cgw_check_acks(privdata); /* check un-acked messages */ if (ret == -1) { mutex_lock(conn->flow_mutex); conn->status = SMSCCONN_RECONNECTING; mutex_unlock(conn->flow_mutex); } } } conn_destroy(server); while ((msg = list_extract_first(privdata->outgoing_queue)) != NULL) bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_SHUTDOWN); mutex_lock(conn->flow_mutex); conn->status = SMSCCONN_DEAD; list_destroy(privdata->outgoing_queue, NULL); octstr_destroy(privdata->host); octstr_destroy(privdata->allow_ip); octstr_destroy(privdata->deny_ip); gw_free(privdata); conn->data = NULL; mutex_unlock(conn->flow_mutex); debug("bb.sms", 0, "smsc_cgw connection has completed shutdown."); bb_smscconn_killed();}static Connection *cgw_open_send_connection(SMSCConn *conn){ PrivData *privdata = conn->data; int wait; Connection *server; Msg *msg; wait = 0; while (!privdata->shutdown) { /* Change status only if the first attempt to form a * connection fails, as it's possible that the SMSC closed the * connection because of idle timeout and a new one will be * created quickly. */ if (wait) { if (conn->status == SMSCCONN_ACTIVE) { mutex_lock(conn->flow_mutex); conn->status = SMSCCONN_RECONNECTING; mutex_unlock(conn->flow_mutex); } while ((msg = list_extract_first(privdata->outgoing_queue))) bb_smscconn_send_failed(conn, msg, SMSCCONN_FAILED_TEMPORARILY); info(0, "smsc_cgw: waiting for %d minutes before trying to connect again", wait); gwthread_sleep(wait * 60); wait = wait > 5 ? 10 : wait * 2; } else wait = 1; server = conn_open_tcp_with_port(privdata->host, privdata->port, privdata->our_port, NULL /* privdata->our_host */); if (privdata->shutdown) { conn_destroy(server); return NULL; } if (server == NULL) { error(0, "smsc_cgw: opening TCP connection to %s failed", octstr_get_cstr(privdata->host)); continue; } if (conn->status != SMSCCONN_ACTIVE) { mutex_lock(conn->flow_mutex); conn->status = SMSCCONN_ACTIVE; conn->connect_time = time(NULL); mutex_unlock(conn->flow_mutex); bb_smscconn_connected(conn); } return server; } return NULL;}/****************************************************************************** * Send messages in queue. */static int cgw_send_loop(SMSCConn *conn, Connection *server){ PrivData *privdata = conn->data; struct cgwop *cgwop; Msg *msg; int firsttrn; /* Send messages in queue */ while ((msg = list_extract_first(privdata->outgoing_queue)) != NULL) { firsttrn = privdata->nexttrn; while (privdata->sendtime[privdata->nexttrn] != 0) { if (++privdata->nexttrn >= CGW_TRN_MAX) privdata->nexttrn = 0; if (privdata->nexttrn == firsttrn) { /* no available trn */ /* this happens too many messages are sent, and old messages * haven't been acked. In this case, increase size of * CGW_TRN_MAX */ info(0, "cgw: Saturated, increase size of CGW_TRN_MAX!"); list_produce(privdata->outgoing_queue, msg); return 1; /* re-insert, and go check for acks */ } } cgwop = msg_to_cgwop(privdata, msg, privdata->nexttrn); if (cgwop == NULL) { info(0, "cgw: cgwop == NULL"); return 0; } privdata->sendmsg[privdata->nexttrn] = msg; privdata->sendtime[privdata->nexttrn] = time(NULL); if (cgwop_send(server, cgwop) == -1) { cgwop_destroy(cgwop); info(0, "cgw: Unable to send (cgwop_send() == -1)"); return -1; } privdata->unacked++; cgwop_destroy(cgwop); } return 0;}/* Check whether there are messages the server hasn't acked in a * reasonable time */void cgw_check_acks(PrivData *privdata){ time_t current_time; int i; current_time = time(NULL); if (privdata->unacked && (current_time > privdata->check_time + 30)) { privdata->check_time = current_time; for (i = 0; i < CGW_TRN_MAX; i++) if (privdata->sendtime[i] && privdata->sendtime[i] < (current_time - privdata->waitack)) { privdata->sendtime[i] = 0; privdata->unacked--; warning(0, "smsc_cgw: received neither OK nor ERR for message %d " "in %d seconds, resending message", i, privdata->waitack); list_produce(privdata->outgoing_queue, privdata->sendmsg[i]); } }}/****************************************************************************** * cgw_wait_command - Used by cgw_sender thread to read delivery reports */int cgw_wait_command(PrivData *privdata, SMSCConn *conn, Connection *server, int timeout){ int ret; struct cgwop *cgwop; /* is there data to be read? */ ret = gwthread_pollfd(privdata->send_socket, POLLIN, 0.2); if (ret != -1) { /* read all waiting ops */ cgwop = cgw_read_op(privdata, conn, server, timeout); if (cgwop != NULL) { do { if (conn_eof(server)) { info(0, "cgw: Connection closed by SMSC"); conn->status = SMSCCONN_DISCONNECTED; if (cgwop != NULL) cgwop_destroy(cgwop); return -1; } if (conn_read_error(server)) { error(0, "cgw: Error trying to read ACKs from SMSC"); if (cgwop != NULL) cgwop_destroy(cgwop); return -1; } cgw_handle_op(conn, server, cgwop); cgwop_destroy(cgwop); } while ((cgwop = cgw_read_op(privdata, conn, server, timeout)) != NULL); } else conn_wait(server, 1); /* added because gwthread_pollfd seems to always return 1. This will keep the load on a reasonable level */ } return 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -