📄 xjab_worker.c
字号:
/* * $Id: xjab_worker.c,v 1.36.2.1 2005/06/16 10:30:08 andrei Exp $ * * eXtended JABber module - worker implementation * * * Copyright (C) 2001-2003 FhG Fokus * * This file is part of ser, a free SIP server. * * ser is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version * * For a license to use the ser software under conditions * other than those described here, or to purchase support for this * software, please contact iptel.org by e-mail at the following addresses: * info@iptel.org * * ser is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//*** * --- * * History * ------- * 2003-01-20 xj_worker_precess function cleaning - some part of it moved to * xj_worker_check_jcons function, (dcm) * 2003-02-28 send NOTIFYs even the connection is closed by user, (dcm) * 2003-03-11 major locking changes - uses locking.h, (andrei) * 2003-05-07 added new presence status - 'terminated' - when connection * with Jabber server is lost or closed, (dcm) * 2003-05-09 added new presence status - 'refused' - when the presence * subscription request is refused by target, (dcm) * 2003-05-09 new xj_worker_precess function cleaning - some part of it moved * to xj_worker_check_qmsg and xj_worker_check_watcher functions, * (dcm) * 2004-06-07 new DB api => xj_worker_process takes another parameter: dbf * (andrei) */#include <string.h>#include <unistd.h>#include <stdio.h>#include <sys/time.h>#include <sys/types.h>#include <fcntl.h>#include <errno.h>#include <signal.h>#include "../../dprint.h"#include "../../timer.h"#include "../../mem/mem.h"#include "../../mem/shm_mem.h"#include "../tm/tm_load.h"#include "xjab_worker.h"#include "xjab_util.h"#include "xjab_jcon.h"#include "xjab_dmsg.h"#include "xode.h"#include "xjab_presence.h"#include "mdefines.h"#define XJAB_RESOURCE "serXjab"#define XJ_ADDRTR_NUL 0#define XJ_ADDRTR_S2J 1#define XJ_ADDRTR_J2S 2#define XJ_ADDRTR_CON 4#define XJ_MSG_POOL_SIZE 10// proxy address#define _PADDR(a) ((a)->aliases->proxy)/** TM bind */extern struct tm_binds tmb;/** debug info */int _xj_pid = 0;int main_loop = 1;/** **/extern char *registrar;static str jab_gw_name = {"jabber_gateway@127.0.0.1", 24};/** * address correction * alias A~B: flag == 0 => A->B, otherwise B->A */int xj_address_translation(str *src, str *dst, xj_jalias als, int flag){ char *p, *p0; int i, ll; if(!src || !dst || !src->s || !dst->s ) return -1; if(!als || !als->jdm || !als->jdm->s || als->jdm->len <= 0) goto done; dst->len = 0;#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_address_translation:%d: - checking aliases\n", _xj_pid);#endif p = src->s; while(p<(src->s + src->len) && *p != '@') p++; if(*p != '@') goto done; p++; ll = src->s + src->len - p;#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_address_translation:%d: - domain is [%.*s]\n",_xj_pid,ll,p);#endif /*** checking aliases */ if(als->size > 0) { for(i=0; i<als->size; i++) if(als->a[i].len == ll && !strncasecmp(p, als->a[i].s, als->a[i].len)) { if(als->d[i]) { if(flag & XJ_ADDRTR_S2J) { strncpy(dst->s, src->s, src->len); p0 = dst->s; while(p0 < dst->s + (p-src->s)) { if(*p0 == als->dlm) *p0 = als->d[i]; p0++; } return 0; } if(flag & XJ_ADDRTR_J2S) { strncpy(dst->s, src->s, src->len); p0 = dst->s; while(p0 < dst->s + (p-src->s)) { if(*p0 == als->d[i]) *p0 = als->dlm; p0++; } return 0; } } goto done; } }#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_address_translation:%d: - doing address correction\n", _xj_pid); #endif if(flag & XJ_ADDRTR_S2J) { if(als->jdm->len != ll || strncasecmp(p, als->jdm->s, als->jdm->len)) { DBG("XJA:xj_address_translation:%d: - wrong Jabber" " destination <%.*s>!\n", _xj_pid, src->len, src->s); return -1; } if(flag & XJ_ADDRTR_CON) {#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_address_translation:%d: - that is for" " Jabber conference\n", _xj_pid);#endif p0 = p-1; while(p0 > src->s && *p0 != als->dlm) p0--; if(p0 <= src->s) return -1; p0--; while(p0 > src->s && *p0 != als->dlm) p0--; if(*p0 != als->dlm) return -1; dst->len = p - p0 - 2; strncpy(dst->s, p0+1, dst->len); dst->s[dst->len]=0; p = dst->s; while(p < (dst->s + dst->len) && *p!=als->dlm) p++; if(*p==als->dlm) *p = '@'; return 0; }#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_address_translation:%d: - that is for" " Jabber network\n", _xj_pid);#endif dst->len = p - src->s - 1; strncpy(dst->s, src->s, dst->len); dst->s[dst->len]=0; if((p = strchr(dst->s, als->dlm)) != NULL) *p = '@'; else { DBG("XJA:xj_address_translation:%d: - wrong Jabber" " destination <%.*s>!!!\n", _xj_pid, src->len, src->s); return -1; } return 0; } if(flag & XJ_ADDRTR_J2S) { *(p-1) = als->dlm; p0 = src->s + src->len; while(p0 > p) { if(*p0 == '/') { src->len = p0 - src->s; *p0 = 0; } p0--; } strncpy(dst->s, src->s, src->len); dst->s[src->len] = '@'; dst->s[src->len+1] = 0; strncat(dst->s, als->jdm->s, als->jdm->len); dst->len = strlen(dst->s); return 0; }done: dst->s = src->s; dst->len = src->len; return 0; }/** * worker implementation * - jwl : pointer to the workers list * - jaddress : address of the jabber server * - jport : port of the jabber server * - rank : worker's rank * - db_con : connection to database * dbf: database module callbacks structure * #return : 0 on success or <0 on error */int xj_worker_process(xj_wlist jwl, char* jaddress, int jport, int rank, db_con_t* db_con, db_func_t* dbf){ int pipe, ret, i, pos, maxfd, flag; xj_jcon_pool jcp; struct timeval tmv; fd_set set, mset; xj_sipmsg jsmsg; str sto; xj_jcon jbc = NULL; xj_jconf jcf = NULL; char *p, buff[1024], recv_buff[4096]; int flags, nr, ltime = 0; db_key_t keys[] = {"sip_id", "type"}; db_val_t vals[2]; db_key_t col[] = {"jab_id", "jab_passwd"}; db_res_t* res = NULL; vals[0].type=DB_STRING; vals[0].nul=0; vals[0].val.string_val=buff; vals[1].type=DB_INT; vals[1].nul=0; vals[1].val.int_val=0; _xj_pid = getpid(); //signal(SIGTERM, xj_sig_handler); //signal(SIGINT, xj_sig_handler); //signal(SIGQUIT, xj_sig_handler); signal(SIGSEGV, xj_sig_handler); if(registrar) { jab_gw_name.s = registrar; jab_gw_name.len = strlen(registrar); if(registrar[0]== 's' && registrar[1]== 'i' && registrar[2]== 'p' && registrar[3]== ':') { jab_gw_name.s += 4; jab_gw_name.len -= 4; } } if(!jwl || !jwl->aliases || !jwl->aliases->jdm || !jaddress || rank >= jwl->len) { DBG("XJAB:xj_worker[%d]:%d: exiting - wrong parameters\n", rank, _xj_pid); return -1; } pipe = jwl->workers[rank].rpipe; DBG("XJAB:xj_worker[%d]:%d: started - pipe=<%d> : 1st message delay" " <%d>\n", rank, _xj_pid, pipe, jwl->delayt); if((jcp=xj_jcon_pool_init(jwl->maxj,XJ_MSG_POOL_SIZE,jwl->delayt))==NULL) { DBG("XJAB:xj_worker: cannot allocate the pool\n"); return -1; } maxfd = pipe; tmv.tv_sec = jwl->sleept; tmv.tv_usec = 0; FD_ZERO(&set); FD_SET(pipe, &set); while(main_loop) { mset = set; tmv.tv_sec = (jcp->jmqueue.size == 0)?jwl->sleept:1;#ifdef XJ_EXTRA_DEBUG //DBG("XJAB:xj_worker[%d]:%d: select waiting %ds - queue=%d\n",rank, // _xj_pid, (int)tmv.tv_sec, jcp->jmqueue.size);#endif tmv.tv_usec = 0; ret = select(maxfd+1, &mset, NULL, NULL, &tmv); // check the msg queue xj_worker_check_qmsg(jwl, jcp); if(ret <= 0) goto step_x;#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: something is coming\n", _xj_pid);#endif if(!FD_ISSET(pipe, &mset)) goto step_y; if(read(pipe, &jsmsg, sizeof(jsmsg)) < (int)sizeof(jsmsg)) { DBG("XJAB:xj_worker:%d: BROKEN PIPE - exiting\n", _xj_pid); break; }#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: job <%p> from SER\n", _xj_pid, jsmsg);#endif if(jsmsg == NULL || jsmsg->jkey==NULL || jsmsg->jkey->id==NULL) goto step_w; strncpy(buff, jsmsg->jkey->id->s, jsmsg->jkey->id->len); buff[jsmsg->jkey->id->len] = 0; jbc = xj_jcon_pool_get(jcp, jsmsg->jkey); switch(jsmsg->type) { case XJ_SEND_MESSAGE: if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm) && (!jbc||!xj_jcon_get_jconf(jbc,&jsmsg->to,jwl->aliases->dlm))) { xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_ERR_NOTJCONF, NULL); goto step_w; } break; case XJ_REG_WATCHER: case XJ_JOIN_JCONF: case XJ_GO_ONLINE: break; case XJ_EXIT_JCONF: if(jbc == NULL) goto step_w; // close the conference session here if(jbc->nrjconf <= 0) goto step_w; if(!xj_jconf_check_addr(&jsmsg->to, jwl->aliases->dlm)) xj_jcon_del_jconf(jbc, &jsmsg->to, jwl->aliases->dlm, XJ_JCMD_UNSUBSCRIBE); xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_INF_JCONFEXIT, NULL); goto step_w; case XJ_GO_OFFLINE: if(jbc != NULL) jbc->expire = ltime = -1; goto step_w; case XJ_DEL_WATCHER: default: goto step_w; } if(jbc != NULL) {#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: connection already exists" " for <%s> ...\n", _xj_pid, buff);#endif xj_jcon_update(jbc, jwl->cachet); goto step_z; } // NO OPEN CONNECTION FOR THIS SIP ID#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: new connection for <%s>.\n", _xj_pid, buff);#endif if(dbf->query(db_con, keys, 0, vals, col, 2, 2, NULL, &res) != 0 || RES_ROW_N(res) <= 0) {#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: no database result when looking" " for associated Jabber account\n", _xj_pid);#endif xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_ERR_JGWFORB, NULL); goto step_v; } jbc = xj_jcon_init(jaddress, jport); if(xj_jcon_connect(jbc)) { DBG("XJAB:xj_worker:%d: Cannot connect" " to the Jabber server ...\n", _xj_pid); xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_ERR_NOJSRV, NULL); goto step_v; } #ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker: auth to jabber as: [%s] / [xxx]\n", (char*)(ROW_VALUES(RES_ROWS(res))[0].val.string_val));// (char*)(ROW_VALUES(RES_ROWS(res))[1].val.string_val));#endif if(xj_jcon_user_auth(jbc, (char*)(ROW_VALUES(RES_ROWS(res))[0].val.string_val), (char*)(ROW_VALUES(RES_ROWS(res))[1].val.string_val), XJAB_RESOURCE) < 0) { DBG("XJAB:xj_worker:%d: Authentication to the Jabber server" " failed ...\n", _xj_pid); xj_jcon_disconnect(jbc); xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_ERR_JAUTH, NULL); xj_jcon_free(jbc); goto step_v; } if(xj_jcon_set_attrs(jbc, jsmsg->jkey, jwl->cachet, jwl->delayt) || xj_jcon_pool_add(jcp, jbc)) { DBG("XJAB:xj_worker:%d: Keeping connection to Jabber server" " failed! Not enough memory ...\n", _xj_pid); xj_jcon_disconnect(jbc); xj_send_sip_msgz(_PADDR(jwl), jsmsg->jkey->id, &jsmsg->to, XJ_DMSG_ERR_JGWFULL, NULL); xj_jcon_free(jbc); goto step_v; } /** add socket descriptor to select */#ifdef XJ_EXTRA_DEBUG DBG("XJAB:xj_worker:%d: add connection on <%d> \n", _xj_pid, jbc->sock);#endif if(jbc->sock > maxfd) maxfd = jbc->sock; FD_SET(jbc->sock, &set); xj_jcon_get_roster(jbc); xj_jcon_send_presence(jbc, NULL, NULL, "Online", "9"); /** wait for a while - the worker is tired */ //sleep(3); if ((res != NULL) && (dbf->free_result(db_con,res) < 0)) { DBG("XJAB:xj_worker:%d:Error while freeing" " SQL result - worker terminated\n", _xj_pid); return -1; } else res = NULL;step_z: if(jsmsg->type == XJ_GO_ONLINE) goto step_w; if(jsmsg->type == XJ_REG_WATCHER) { // update or register a presence watcher xj_worker_check_watcher(jwl, jcp, jbc, jsmsg); goto step_w; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -