📄 bb_boxc.c
字号:
/* ==================================================================== * The Kannel Software License, Version 1.0 * * Copyright (c) 2001-2004 Kannel Group * Copyright (c) 1998-2001 WapIT Ltd. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Kannel Group (http://www.kannel.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Kannel" and "Kannel Group" must not be used to * endorse or promote products derived from this software without * prior written permission. For written permission, please * contact org@kannel.org. * * 5. Products derived from this software may not be called "Kannel", * nor may "Kannel" appear in their name, without prior written * permission of the Kannel Group. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Kannel Group. For more information on * the Kannel Group, please see <http://www.kannel.org/>. * * Portions of this software are based upon software originally written at * WapIT Ltd., Helsinki, Finland for the Kannel project. */ /* * bb_boxc.c : bearerbox box connection module * * handles start/restart/stop/suspend/die operations of the sms and * wapbox connections * * Kalle Marjola 2000 for project Kannel */#include <errno.h>#include <stdlib.h>#include <stdio.h>#include <time.h>#include <string.h>#include <sys/time.h>#include <sys/types.h>#include <sys/socket.h>#include <unistd.h>#include <signal.h>#include "gwlib/gwlib.h"#include "msg.h"#include "bearerbox.h"#include "bb_smscconn_cb.h"#define SMSBOX_MAX_PENDING 100/* passed from bearerbox core */extern volatile sig_atomic_t bb_status;extern volatile sig_atomic_t restart;extern List *incoming_sms;extern List *outgoing_sms;extern List *incoming_wdp;extern List *outgoing_wdp;extern List *flow_threads;extern List *suspended;/* incoming/outgoing sms queue control */extern long max_incoming_sms_qlength;/* our own thingies */static volatile sig_atomic_t smsbox_running;static volatile sig_atomic_t wapbox_running;static List *wapbox_list;static List *smsbox_list;static RWLock *smsbox_list_rwlock;/* dictionaries for holding the smsbox routing information */static Dict *smsbox_by_id;static Dict *smsbox_by_smsc;static Dict *smsbox_by_receiver;static long smsbox_port;static int smsbox_port_ssl;static long wapbox_port;static int wapbox_port_ssl;/* max pending messages on the line to smsbox */static long smsbox_max_pending;static Octstr *box_allow_ip;static Octstr *box_deny_ip;static Counter *boxid;/* sms_to_smsboxes thread-id */static long sms_dequeue_thread;typedef struct _boxc { Connection *conn; int is_wap; long id; int load; time_t connect_time; Octstr *client_ip; List *incoming; List *retry; /* If sending fails */ List *outgoing; Dict *sent; Semaphore *pending; volatile sig_atomic_t alive; Octstr *boxc_id; /* identifies the connected smsbox instance */ /* used to mark connection usable or still waiting for ident. msg */ volatile int routable;} Boxc;/* forward declaration */static void sms_to_smsboxes(void *arg);static int send_msg(Boxc *boxconn, Msg *pmsg);static void boxc_sent_push(Boxc*, Msg*);static void boxc_sent_pop(Boxc*, Msg*);/*------------------------------------------------- * receiver thingies */static Msg *read_from_box(Boxc *boxconn){ int ret; Octstr *pack; Msg *msg; pack = NULL; while (bb_status != BB_DEAD && boxconn->alive) { /* XXX: if box doesn't send (just keep conn open) we block here while shutdown */ pack = conn_read_withlen(boxconn->conn); gw_claim_area(pack); if (pack != NULL) break; if (conn_error(boxconn->conn)) { info(0, "Read error when reading from box <%s>, disconnecting", octstr_get_cstr(boxconn->client_ip)); return NULL; } if (conn_eof(boxconn->conn)) { info(0, "Connection closed by the box <%s>", octstr_get_cstr(boxconn->client_ip)); return NULL; } ret = conn_wait(boxconn->conn, -1.0); if (ret < 0) { error(0, "Connection to box <%s> broke.", octstr_get_cstr(boxconn->client_ip)); return NULL; } } if (pack == NULL) return NULL; msg = msg_unpack(pack); octstr_destroy(pack); if (msg == NULL) error(0, "Failed to unpack data!"); return msg;}/* * Try to deliver message to internal or smscconn queue * and generate ack/nack for smsbox connections. */static void deliver_sms_to_queue(Msg *msg, Boxc *conn){ Msg *mack, *mack_store; int rc; /* * save modifies ID and time, so if the smsbox uses it, save * it FIRST for the reply message!!! */ mack = msg_create(ack); gw_assert(mack != NULL); uuid_copy(mack->ack.id, msg->sms.id); mack->ack.time = msg->sms.time; store_save(msg); rc = smsc2_rout(msg); switch(rc) { case 1: mack->ack.nack = ack_success; break; case 0: mack->ack.nack = ack_buffered; break; case -1: /* no router at all */ warning(0, "Message rejected by bearerbox, no router!"); /* * first create nack for store-file, in order to delete * message from store-file. */ mack_store = msg_create(ack); gw_assert(mack_store != NULL); uuid_copy(mack_store->ack.id, msg->sms.id); mack_store->ack.time = msg->sms.time; mack->ack.nack = mack_store->ack.nack = ack_failed; store_save(mack_store); msg_destroy(mack_store); /* destroy original message */ msg_destroy(msg); break; } /* put ack into incoming queue of conn */ send_msg(conn, mack); msg_destroy(mack);}static void boxc_receiver(void *arg){ Boxc *conn = arg; Msg *msg, *mack; /* remove messages from socket until it is closed */ while (bb_status != BB_DEAD && conn->alive) { list_consume(suspended); /* block here if suspended */ msg = read_from_box(conn); if (msg == NULL) { /* garbage/connection lost */ conn->alive = 0; break; } /* we don't accept new messages in shutdown phase */ if ((bb_status == BB_SHUTDOWN || bb_status == BB_DEAD) && msg_type(msg) == sms) { mack = msg_create(ack); uuid_copy(mack->ack.id, msg->sms.id); mack->ack.time = msg->sms.time; mack->ack.nack = ack_failed_tmp; msg_destroy(msg); send_msg(conn, mack); msg_destroy(mack); continue; } if (msg_type(msg) == sms && conn->is_wap == 0) { debug("bb.boxc", 0, "boxc_receiver: sms received"); /* deliver message to queue */ deliver_sms_to_queue(msg, conn); if (conn->routable == 0) { conn->routable = 1; /* wakeup the dequeue thread */ gwthread_wakeup(sms_dequeue_thread); } } else if (msg_type(msg) == wdp_datagram && conn->is_wap) { debug("bb.boxc", 0, "boxc_receiver: got wdp from wapbox"); /* XXX we should block these in SHUTDOWN phase too, but we need ack/nack msgs implemented first. */ list_produce(conn->outgoing, msg); } else if (msg_type(msg) == sms && conn->is_wap) { debug("bb.boxc", 0, "boxc_receiver: got sms from wapbox"); /* should be a WAP push message, so tried it the same way */ deliver_sms_to_queue(msg, conn); if (conn->routable == 0) { conn->routable = 1; /* wakeup the dequeue thread */ gwthread_wakeup(sms_dequeue_thread); } } else { if (msg_type(msg) == heartbeat) { if (msg->heartbeat.load != conn->load) debug("bb.boxc", 0, "boxc_receiver: heartbeat with " "load value %ld received", msg->heartbeat.load); conn->load = msg->heartbeat.load; } else if (msg_type(msg) == ack) { boxc_sent_pop(conn, msg); store_save(msg); debug("bb.boxc", 0, "boxc_receiver: got ack"); } /* if this is an identification message from an smsbox instance */ else if (msg_type(msg) == admin && msg->admin.command == cmd_identify) { /* * any smsbox sends this command even if boxc_id is NULL, * but we will only consider real identified boxes */ if (msg->admin.boxc_id != NULL) { /* and add the boxc_id into conn for boxc_status() output */ if (conn->boxc_id != NULL) { dict_remove(smsbox_by_id, msg->admin.boxc_id); octstr_destroy(conn->boxc_id); } conn->boxc_id = msg->admin.boxc_id; msg->admin.boxc_id = NULL; /* add this identified smsbox to the dictionary */ /* XXX check for equal boxc_id in Dict, otherwise we overwrite it */ dict_put(smsbox_by_id, conn->boxc_id, conn); debug("bb.boxc", 0, "boxc_receiver: got boxc_id <%s> from <%s>", octstr_get_cstr(conn->boxc_id), octstr_get_cstr(conn->client_ip)); } conn->routable = 1; /* wakeup the dequeue thread */ gwthread_wakeup(sms_dequeue_thread); } else warning(0, "boxc_receiver: unknown msg received from <%s>, " "ignored", octstr_get_cstr(conn->client_ip)); msg_destroy(msg); } }}/*--------------------------------------------- * sender thingies */static int send_msg(Boxc *boxconn, Msg *pmsg){ Octstr *pack; pack = msg_pack(pmsg); if (pack == NULL) return -1; if (boxconn->boxc_id != NULL) debug("bb.boxc", 0, "send_msg: sending msg to boxc: <%s>", octstr_get_cstr(boxconn->boxc_id)); else debug("bb.boxc", 0, "send_msg: sending msg to box: <%s>", octstr_get_cstr(boxconn->client_ip)); if (conn_write_withlen(boxconn->conn, pack) == -1) { error(0, "Couldn't write Msg to box <%s>, disconnecting", octstr_get_cstr(boxconn->client_ip)); octstr_destroy(pack); return -1; } octstr_destroy(pack); return 0;}static void boxc_sent_push(Boxc *conn, Msg *m){ Octstr *os; char id[UUID_STR_LEN + 1]; if (conn->is_wap || !conn->sent || !m || msg_type(m) != sms) return; uuid_unparse(m->sms.id, id); os = octstr_create(id); dict_put(conn->sent, os, msg_duplicate(m)); semaphore_down(conn->pending); octstr_destroy(os);}static void boxc_sent_pop(Boxc *conn, Msg *m){ Octstr *os; char id[UUID_STR_LEN + 1]; Msg *msg; if (conn->is_wap || !conn->sent || !m || (msg_type(m) != ack && msg_type(m) != sms)) return; uuid_unparse((msg_type(m) == sms ? m->sms.id : m->ack.id), id); os = octstr_create(id); msg = dict_remove(conn->sent, os); octstr_destroy(os); if (!msg) { error(0, "BOXC: Got ack for nonexistend message!"); msg_dump(m, 0); return; } semaphore_up(conn->pending); msg_destroy(msg);}static void boxc_sender(void *arg){ Msg *msg; Boxc *conn = arg; list_add_producer(flow_threads); while (bb_status != BB_DEAD && conn->alive) { /* * Make sure there's no data left in the outgoing connection before * doing the potentially blocking list_consume()s */ conn_flush(conn->conn); list_consume(suspended); /* block here if suspended */ if ((msg = list_consume(conn->incoming)) == NULL) { /* tell sms/wapbox to die */ msg = msg_create(admin); msg->admin.command = restart ? cmd_restart : cmd_shutdown; send_msg(conn, msg); msg_destroy(msg); break; } if (msg_type(msg) == heartbeat) { debug("bb.boxc", 0, "boxc_sender: catch an heartbeat - we are alive"); msg_destroy(msg); continue; } boxc_sent_push(conn, msg); if (!conn->alive || send_msg(conn, msg) == -1) { /* we got message here */ boxc_sent_pop(conn, msg); list_produce(conn->retry, msg); break; } msg_destroy(msg); debug("bb.boxc", 0, "boxc_sender: sent message to <%s>", octstr_get_cstr(conn->client_ip)); } /* the client closes the connection, after that die in receiver */ /* conn->alive = 0; */ /* set conn to unroutable */ conn->routable = 0; list_remove_producer(flow_threads);}/*--------------------------------------------------------------- * accept/create/kill thingies */static Boxc *boxc_create(int fd, Octstr *ip, int ssl){ Boxc *boxc; boxc = gw_malloc(sizeof(Boxc)); boxc->is_wap = 0; boxc->load = 0; boxc->conn = conn_wrap_fd(fd, ssl); boxc->id = counter_increase(boxid); boxc->client_ip = ip; boxc->alive = 1; boxc->connect_time = time(NULL); boxc->boxc_id = NULL; boxc->routable = 0; return boxc;}static void boxc_destroy(Boxc *boxc){ if (boxc == NULL) return; /* do nothing to the lists, as they are only references */ if (boxc->conn) conn_destroy(boxc->conn); octstr_destroy(boxc->client_ip); octstr_destroy(boxc->boxc_id); gw_free(boxc);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -