📄 smsc_emi.c
字号:
octstr_get_cstr(privdata->name)); for (i = 0; i < EMI2_MAX_TRN; i++) { if (privdata->slots[i].sendtime && privdata->slots[i].sendtype == 51) gw_prioqueue_produce(privdata->outgoing_queue, privdata->slots[i].sendmsg); privdata->slots[i].sendtime = 0; } privdata->unacked = 0;}/* * wait seconds seconds for something to happen (a send SMS request, activity * on the SMSC main connection, an error or timeout) and tell the caller * what happened. */static EMI2Event emi2_wait (SMSCConn *conn, Connection *server, double seconds){ if (emi2_can_send(conn) && gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue)) { return EMI2_SENDREQ; } if (server != NULL) { switch (conn_wait(server, seconds)) { case 1: return gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT; case 0: return EMI2_SMSCREQ; default: return EMI2_CONNERR; } } else { gwthread_sleep(seconds); return gw_prioqueue_len(PRIVDATA(conn)->outgoing_queue) ? EMI2_SENDREQ : EMI2_TIMEOUT; }}/* * obtain the next free TRN. */static int emi2_next_trn (SMSCConn *conn){#define INC_TRN(x) ((x)=((x) + 1) % EMI2_MAX_TRN) int result; while (SLOTBUSY(conn,PRIVDATA(conn)->priv_nexttrn)) INC_TRN(PRIVDATA(conn)->priv_nexttrn); /* pick unused TRN */ result = PRIVDATA(conn)->priv_nexttrn; INC_TRN(PRIVDATA(conn)->priv_nexttrn); return result;#undef INC_TRN}/* * send an EMI type 31 message when required. */static int emi2_keepalive_handling (SMSCConn *conn, Connection *server){ struct emimsg *emimsg; int nexttrn = emi2_next_trn (conn); emimsg = make_emi31(PRIVDATA(conn), nexttrn); if(emimsg) { PRIVDATA(conn)->slots[nexttrn].sendtype= 31; PRIVDATA(conn)->slots[nexttrn].sendtime = time(NULL); PRIVDATA(conn)->unacked++; if (emi2_emimsg_send(conn, server, emimsg) == -1) { emimsg_destroy(emimsg); return -1; } emimsg_destroy(emimsg); } PRIVDATA(conn)->can_write = 0; return 0;}/* * the actual send logic: Send all queued messages in a burst. */static int emi2_do_send(SMSCConn *conn, Connection *server){ struct emimsg *emimsg; Msg *msg; double delay = 0; if (conn->throughput) { delay = 1.0 / conn->throughput; } /* Send messages if there's room in the sending window */ while (emi2_can_send(conn) && (msg = gw_prioqueue_remove(PRIVDATA(conn)->outgoing_queue)) != NULL) { int nexttrn = emi2_next_trn(conn); if (conn->throughput) gwthread_sleep(delay); /* convert the generic Kannel message into an EMI type message */ emimsg = msg_to_emimsg(msg, nexttrn, PRIVDATA(conn)); /* remember the message for retransmission or DLR */ PRIVDATA(conn)->slots[nexttrn].sendmsg = msg; PRIVDATA(conn)->slots[nexttrn].sendtype = 51; PRIVDATA(conn)->slots[nexttrn].sendtime = time(NULL); /* send the message */ if (emi2_emimsg_send(conn, server, emimsg) == -1) { emimsg_destroy(emimsg); return -1; } /* we just sent a message */ PRIVDATA(conn)->unacked++; emimsg_destroy(emimsg); /* * remember that there is an open request for stop-wait flow control * FIXME: couldn't this be done with the unacked field as well? After * all stop-wait is just a window of size 1. */ PRIVDATA(conn)->can_write = 0; } return 0;}static int emi2_handle_smscreq(SMSCConn *conn, Connection *server){ Octstr *str; struct emimsg *emimsg; PrivData *privdata = conn->data; /* Read acks/nacks/ops from the server */ while ((str = conn_read_packet(server, 2, 3))) { debug("smsc.emi2", 0, "EMI2[%s]: Got packet from the main socket", octstr_get_cstr(privdata->name)); /* parse the msg */ emimsg = get_fields(str, privdata->name); octstr_destroy(str); if (emimsg == NULL) { continue; /* The parse functions logged errors */ } if (emimsg->or == 'O') { /* If the SMSC wants to send operations through this * socket, we'll have to read them because there * might be ACKs too. We just drop them while stopped, * hopefully the SMSC will resend them later. */ if (!conn->is_stopped) { if (handle_operation(conn, server, emimsg) < 0) return -1; /* Connection broke */ } else { info(0, "EMI2[%s]: Ignoring operation from main socket " "because the connection is stopped.", octstr_get_cstr(privdata->name)); } } else { /* Already checked to be 'O' or 'R' */ if (!SLOTBUSY(conn,emimsg->trn) || emimsg->ot != PRIVDATA(conn)->slots[emimsg->trn].sendtype) { error(0, "EMI2[%s]: Got ack for TRN %d, don't remember sending O?", octstr_get_cstr(privdata->name), emimsg->trn); } else { PRIVDATA(conn)->can_write = 1; PRIVDATA(conn)->slots[emimsg->trn].sendtime = 0; PRIVDATA(conn)->unacked--; if (emimsg->ot == 51) { if (octstr_get_char(emimsg->fields[0], 0) == 'A') { /* we got an ack back. We might have to store the */ /* timestamp for delivery notifications now */ Octstr *ts, *adc; int i; Msg *m; ts = octstr_duplicate(emimsg->fields[2]); if (octstr_len(ts)) { i = octstr_search_char(ts,':',0); if (i>0) { octstr_delete(ts,0,i+1); adc = octstr_duplicate(emimsg->fields[2]); octstr_truncate(adc,i); m = PRIVDATA(conn)->slots[emimsg->trn].sendmsg; if(m == NULL) { info(0,"EMI2[%s]: uhhh m is NULL, very bad", octstr_get_cstr(privdata->name)); } else if (DLR_IS_ENABLED_DEVICE(m->sms.dlr_mask)) { dlr_add((conn->id ? conn->id : privdata->name), ts, m); } octstr_destroy(ts); octstr_destroy(adc); } else { octstr_destroy(ts); } } /* * report the successful transmission to the generic bb code. */ bb_smscconn_sent(conn, PRIVDATA(conn)->slots[emimsg->trn].sendmsg, NULL); } else { Octstr *reply; /* create reply message */ reply = octstr_create(""); octstr_append(reply, emimsg->fields[1]); octstr_append_char(reply, '-'); /* system message is optional */ if (emimsg->fields[2] != NULL) octstr_append(reply, emimsg->fields[2]); /* XXX Process error code here long errorcode; octstr_parse_long(&errorcode, emimsg->fields[1], 0, 10); ... switch(errorcode) ... } else { */ bb_smscconn_send_failed(conn, PRIVDATA(conn)->slots[emimsg->trn].sendmsg, SMSCCONN_FAILED_REJECTED, reply); /* } */ } } else if (emimsg->ot == 31) { /* XXX Process error codes here if (octstr_get_char(emimsg->fields[0], 0) == 'N') { long errorcode; octstr_parse_long(&errorcode, emimsg->fields[1], 0, 10); ... switch errorcode ... } else { */ ; /* } */ } else { panic(0, "EMI2[%s]: Bug, ACK handler missing for sent packet", octstr_get_cstr(privdata->name)); } } } emimsg_destroy(emimsg); } if (conn_error(server)) { error(0, "EMI2[%s]: Error trying to read ACKs from SMSC", octstr_get_cstr(privdata->name)); return -1; } if (conn_eof(server)) { info(0, "EMI2[%s]: Main connection closed by SMSC", octstr_get_cstr(privdata->name)); return -1; } return 0;}static void emi2_idleprocessing(SMSCConn *conn, Connection **server){ time_t current_time; int i; PrivData *privdata = conn->data; /* * Check whether there are messages the server hasn't acked in a * reasonable time */ current_time = time(NULL); if (PRIVDATA(conn)->unacked && (current_time > (PRIVDATA(conn)->check_time + 30))) { PRIVDATA(conn)->check_time = current_time; for (i = 0; i < PRIVDATA(conn)->window; i++) { if (SLOTBUSY(conn,i) && PRIVDATA(conn)->slots[i].sendtime < (current_time - PRIVDATA(conn)->waitack)) { if (PRIVDATA(conn)->slots[i].sendtype == 51) { if (PRIVDATA(conn)->waitack_expire == 0x00) { /* 0x00 - disconnect/reconnect */ warning(0, "EMI2[%s]: received neither ACK nor NACK for message %d " "in %d seconds, disconnecting and reconnection", octstr_get_cstr(privdata->name), i, PRIVDATA(conn)->waitack); PRIVDATA(conn)->slots[i].sendtime = 0; PRIVDATA(conn)->unacked--; info(0, "EMI2[%s]: closing connection.", octstr_get_cstr(privdata->name)); conn_destroy(*server); *server = NULL; break; } else if (PRIVDATA(conn)->waitack_expire == 0x01) { /* 0x01 - resend */ warning(0, "EMI2[%s]: received neither ACK nor NACK for message %d " "in %d seconds, resending message", octstr_get_cstr(privdata->name), i, PRIVDATA(conn)->waitack); gw_prioqueue_produce(PRIVDATA(conn)->outgoing_queue, PRIVDATA(conn)->slots[i].sendmsg); PRIVDATA(conn)->slots[i].sendtime = 0; PRIVDATA(conn)->unacked--; if (PRIVDATA(conn)->flowcontrol) PRIVDATA(conn)->can_write=1; /* Wake up this same thread to send again * (simpler than avoiding sleep) */ gwthread_wakeup(PRIVDATA(conn)->sender_thread); } else if (PRIVDATA(conn)->waitack_expire == 0x02) { /* 0x02 - carry on waiting */ warning(0, "EMI2[%s]: received neither ACK nor NACK for message %d " "in %d seconds, carrying on waiting", octstr_get_cstr(privdata->name), i, PRIVDATA(conn)->waitack); } } else if (PRIVDATA(conn)->slots[i].sendtype == 31) { warning(0, "EMI2[%s]: Alert (operation 31) was not " "ACKed within %d seconds", octstr_get_cstr(privdata->name), PRIVDATA(conn)->waitack); if (PRIVDATA(conn)->flowcontrol) PRIVDATA(conn)->can_write=1; } else { panic(0, "EMI2[%s]: Bug, no timeout handler for sent packet", octstr_get_cstr(privdata->name)); } } } }}static void emi2_idletimeout_handling (SMSCConn *conn, Connection **server){ PrivData *privdata = conn->data; /* * close the connection if there was no activity. */ if ((*server != NULL) && CONNECTIONIDLE(conn)) { info(0, "EMI2[%s]: closing idle connection.", octstr_get_cstr(privdata->name)); conn_destroy(*server); *server = NULL; }}/* * this function calculates the new timeouttime. */static double emi2_get_timeouttime (SMSCConn *conn, Connection *server){ double ka_timeouttime = PRIVDATA(conn)->keepalive ? PRIVDATA(conn)->keepalive + 1 : DBL_MAX; double idle_timeouttime = (PRIVDATA(conn)->idle_timeout && server) ? PRIVDATA(conn)->idle_timeout : DBL_MAX; double result = ka_timeouttime < idle_timeouttime ? ka_timeouttime : idle_timeouttime; if (result == DBL_MAX) result = 30; return result;}/* * the main event processing loop. */static void emi2_send_loop(SMSCConn *conn, Connection **server){ PrivData *privdata = conn->data; for (;;) { double timeouttime; EMI2Event event; if (emi2_needs_keepalive (conn)) { if (*server == NULL) { return; /* reopen the connection */ } emi2_keepalive_handling (conn, *server); } timeouttime = emi2_get_timeouttime (conn, *server); event = emi2_wait (conn, *server, timeouttime); switch (event) { case EMI2_CONNERR: return; case EMI2_SENDREQ: if (*server == NULL) { return; /* reopen the connection */ } if (emi2_do_send (conn, *server) < 0) { return; /* reopen the connection */ } break; case EMI2_SMSCREQ: if (emi2_handle_smscreq (conn, *server) < 0) { return; /* reopen the connection */ } break; case EMI2_TIMEOUT: break; } if ((*server !=NULL) && (emi2_handle_smscreq (conn, *server) < 0)) { return; /* reopen the connection */ } emi2_idleprocessing (conn, server); emi2_idletimeout_handling (conn, server); if (PRIVDATA(conn)->shutdown && (PRIVDATA(conn)->unacked == 0)) { /* shutdown and no open messages */ break; } if (*server != NULL) { if (conn_error(*server)) { warning(0, "EMI2[%s]: Error reading from the main connection", octstr_get_cstr(privdata->name)); break; } if (conn_eof(*server)) { info(0, "EMI2[%s]: Main connection closed by SMSC", octstr_get_cstr(privdata->name)); break; } } }}static void emi2_sender(void *arg){ SMSCConn *conn = arg; PrivData *privdata = conn->data; Msg *msg; Connection *server; /* Make sure we log into our own log-file if defined */ log_thread_to(conn->log_idx); while (!privdata->shutdown) { if ((server = open_send_connection(conn)) == NULL) { privdata->shutdown = 1; if (privdata->rport > 0) gwthread_wakeup(privdata->receiver_thread); break; } emi2_send_loop(conn, &server); clear_sent(privdata); if (server != NULL) { conn_destroy(server);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -