📄 smsc_smpp.c
字号:
* sent queue cleanup. * @return 1 if io_thread should reconnect; 0 if not */static int do_queue_cleanup(SMPP *smpp, long *pending_submits, int action){ List *keys; Octstr *key; struct smpp_msg *smpp_msg; time_t now = time(NULL); if (*pending_submits <= 0) return 0; /* check if action set to wait ack for ever */ if (action == SMPP_WAITACK_NEVER_EXPIRE) return 0; keys = dict_keys(smpp->sent_msgs); if (keys == NULL) return 0; while ((key = list_extract_first(keys)) != NULL) { smpp_msg = dict_get(smpp->sent_msgs, key); if (smpp_msg != NULL && difftime(now, smpp_msg->sent_time) > smpp->wait_ack) { switch(action) { case SMPP_WAITACK_RECONNECT: /* reconnect */ /* found at least one not acked msg */ warning(0, "SMPP[%s]: Not ACKED message found, reconnecting.", octstr_get_cstr(smpp->conn->id)); octstr_destroy(key); list_destroy(keys, octstr_destroy_item); return 1; /* io_thread will reconnect */ case SMPP_WAITACK_REQUEUE: /* requeue */ smpp_msg = dict_remove(smpp->sent_msgs, key); if (smpp_msg != NULL) { warning(0, "SMPP[%s]: Not ACKED message found, will retransmit." " SENT<%ld>sec. ago, SEQ<%s>, DST<%s>", octstr_get_cstr(smpp->conn->id), (long)difftime(now, smpp_msg->sent_time) , octstr_get_cstr(key), octstr_get_cstr(smpp_msg->msg->sms.receiver)); bb_smscconn_send_failed(smpp->conn, smpp_msg->msg, SMSCCONN_FAILED_TEMPORARILY,NULL); smpp_msg_destroy(smpp_msg, 0); (*pending_submits)--; } default: error(0, "SMPP[%s] Unknown clenup action defined %xd.", octstr_get_cstr(smpp->conn->id), action); octstr_destroy(key); list_destroy(keys, octstr_destroy_item); return 0; } } octstr_destroy(key); } list_destroy(keys, octstr_destroy_item); return 0;}/* * This is the main function for the background thread for doing I/O on * one SMPP connection (the one for transmitting or receiving messages). * It makes the initial connection to the SMPP server and re-connects * if there are I/O errors or other errors that require it. */static void io_thread(void *arg){ SMPP *smpp; struct io_arg *io_arg; int transmitter; Connection *conn; int ret; long last_enquire_sent; long pending_submits; long len; SMPP_PDU *pdu; double timeout; time_t last_response, last_cleanup; io_arg = arg; smpp = io_arg->smpp; transmitter = io_arg->transmitter; gw_free(io_arg); /* Make sure we log into our own log-file if defined */ log_thread_to(smpp->conn->log_idx); conn = NULL; while (!smpp->quitting) { if (transmitter == 1) conn = open_transmitter(smpp); else if (transmitter == 2) conn = open_transceiver(smpp); else conn = open_receiver(smpp); last_enquire_sent = last_cleanup = last_response = date_universal_now(); pending_submits = -1; len = 0; smpp->throttling_err_time = 0; for (;conn != NULL;) { timeout = last_enquire_sent + smpp->enquire_link_interval - date_universal_now(); if (conn_wait(conn, timeout) == -1) break; /* unbind * Read so long as unbind_resp received or timeout passed. Otherwise we have * double delivered messages. */ if (smpp->quitting) { send_unbind(smpp, conn); last_response = time(NULL); while(conn_wait(conn, 1.00) != -1 && difftime(time(NULL), last_response) < SMPP_DEFAULT_SHUTDOWN_TIMEOUT && smpp->conn->status != SMSCCONN_DISCONNECTED) { if (read_pdu(smpp, conn, &len, &pdu) == 1) { dump_pdu("Got PDU:", smpp->conn->id, pdu); handle_pdu(smpp, conn, pdu, &pending_submits); smpp_pdu_destroy(pdu); } } debug("bb.sms.smpp", 0, "SMPP[%s]: %s: break and shutting down", octstr_get_cstr(smpp->conn->id), __PRETTY_FUNCTION__); break; } send_enquire_link(smpp, conn, &last_enquire_sent); while ((ret = read_pdu(smpp, conn, &len, &pdu)) == 1) { last_response = time(NULL); /* Deal with the PDU we just got */ dump_pdu("Got PDU:", smpp->conn->id, pdu); handle_pdu(smpp, conn, pdu, &pending_submits); smpp_pdu_destroy(pdu); /* * check if we are still connected. * Note: smsc can send unbind request, so we must check it. */ if (smpp->conn->status != SMSCCONN_ACTIVE && smpp->conn->status != SMSCCONN_ACTIVE_RECV) { /* we are disconnected */ ret = -1; break; } /* Make sure we send enquire_link even if we read a lot */ send_enquire_link(smpp, conn, &last_enquire_sent); /* Make sure we send even if we read a lot */ if (transmitter && difftime(time(NULL), smpp->throttling_err_time) > SMPP_THROTTLING_SLEEP_TIME) { smpp->throttling_err_time = 0; send_messages(smpp, conn, &pending_submits); } } if (ret == -1) { error(0, "SMPP[%s]: I/O error or other error. Re-connecting.", octstr_get_cstr(smpp->conn->id)); break; } /* if no PDU was received and connection timeout was set and over the limit */ if (ret == 0 && smpp->connection_timeout > 0 && difftime(time(NULL), last_response) > smpp->connection_timeout) { error(0, "SMPP[%s]: No responses from SMSC within %ld sec. Reconnecting.", octstr_get_cstr(smpp->conn->id), smpp->connection_timeout); break; } /* cleanup sent queue */ if (transmitter && difftime(time(NULL), last_cleanup) > smpp->wait_ack) { if (do_queue_cleanup(smpp, &pending_submits, smpp->wait_ack_action)) break; /* reconnect */ last_cleanup = time(NULL); } if (transmitter && difftime(time(NULL), smpp->throttling_err_time) > SMPP_THROTTLING_SLEEP_TIME) { smpp->throttling_err_time = 0; send_messages(smpp, conn, &pending_submits); } } if (conn != NULL) { conn_destroy(conn); conn = NULL; } /* * Put all queued messages back into global queue,so if * we have another link running then messages will be delivered * quickly. */ if (transmitter) { Msg *msg; struct smpp_msg *smpp_msg; List *noresp; Octstr *key; long reason = (smpp->quitting?SMSCCONN_FAILED_SHUTDOWN:SMSCCONN_FAILED_TEMPORARILY); while((msg = list_extract_first(smpp->msgs_to_send)) != NULL) bb_smscconn_send_failed(smpp->conn, msg, reason, NULL); noresp = dict_keys(smpp->sent_msgs); while((key = list_extract_first(noresp)) != NULL) { smpp_msg = dict_remove(smpp->sent_msgs, key); if (smpp_msg != NULL && smpp_msg->msg) { bb_smscconn_send_failed(smpp->conn, smpp_msg->msg, reason, NULL); smpp_msg_destroy(smpp_msg, 0); } octstr_destroy(key); } list_destroy(noresp, NULL); } /* * Reconnect if that was a connection problem. */ if (!smpp->quitting) { error(0, "SMPP[%s]: Couldn't connect to SMS center (retrying in %ld seconds).", octstr_get_cstr(smpp->conn->id), smpp->conn->reconnect_delay); mutex_lock(smpp->conn->flow_mutex); smpp->conn->status = SMSCCONN_RECONNECTING; mutex_unlock(smpp->conn->flow_mutex); gwthread_sleep(smpp->conn->reconnect_delay); } }}/*********************************************************************** * Functions called by smscconn.c via the SMSCConn function pointers. */static long queued_cb(SMSCConn *conn){ SMPP *smpp; smpp = conn->data; conn->load = (smpp ? (conn->status != SMSCCONN_DEAD ? list_len(smpp->msgs_to_send) : 0) : 0); return conn->load;}static int send_msg_cb(SMSCConn *conn, Msg *msg){ SMPP *smpp; smpp = conn->data; list_produce(smpp->msgs_to_send, msg_duplicate(msg)); gwthread_wakeup(smpp->transmitter); return 0;}static int shutdown_cb(SMSCConn *conn, int finish_sending){ SMPP *smpp; debug("bb.smpp", 0, "Shutting down SMSCConn %s (%s)", octstr_get_cstr(conn->name), finish_sending ? "slow" : "instant"); conn->why_killed = SMSCCONN_KILLED_SHUTDOWN; /* XXX implement finish_sending */ smpp = conn->data; smpp->quitting = 1; if (smpp->transmitter != -1) { gwthread_wakeup(smpp->transmitter); gwthread_join(smpp->transmitter); } if (smpp->receiver != -1) { gwthread_wakeup(smpp->receiver); gwthread_join(smpp->receiver); } smpp_destroy(smpp); debug("bb.smpp", 0, "SMSCConn %s shut down.", octstr_get_cstr(conn->name)); conn->status = SMSCCONN_DEAD; bb_smscconn_killed(); return 0;}/*********************************************************************** * Public interface. This version is suitable for the Kannel bearerbox * SMSCConn interface. */int smsc_smpp_create(SMSCConn *conn, CfgGroup *grp){ Octstr *host; long port; long receive_port; Octstr *username; Octstr *password; Octstr *system_id; Octstr *system_type; Octstr *address_range; long source_addr_ton; long source_addr_npi; long dest_addr_ton; long dest_addr_npi; Octstr *my_number; Octstr *service_type; SMPP *smpp; int ok; int transceiver_mode; Octstr *smsc_id; long enquire_link_interval; long max_pending_submits; long version; long priority; long smpp_msg_id_type; int autodetect_addr; Octstr *alt_charset; long connection_timeout, wait_ack, wait_ack_action; my_number = alt_charset = NULL; transceiver_mode = 0; autodetect_addr = 1; host = cfg_get(grp, octstr_imm("host")); if (cfg_get_integer(&port, grp, octstr_imm("port")) == -1) port = 0; if (cfg_get_integer(&receive_port, grp, octstr_imm("receive-port")) == -1) receive_port = 0; cfg_get_bool(&transceiver_mode, grp, octstr_imm("transceiver-mode")); username = cfg_get(grp, octstr_imm("smsc-username")); password = cfg_get(grp, octstr_imm("smsc-password")); system_type = cfg_get(grp, octstr_imm("system-type")); address_range = cfg_get(grp, octstr_imm("address-range")); my_number = cfg_get(grp, octstr_imm("my-number")); service_type = cfg_get(grp, octstr_imm("service-type")); system_id = cfg_get(grp, octstr_imm("system-id")); if (system_id != NULL) { warning(0, "SMPP: obsolete system-id variable is set, " "use smsc-username instead."); if (username == NULL) { warning(0, "SMPP: smsc-username not set, using system-id instead");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -