📄 stream.c
字号:
/* -------------------------------------------------------------------------- * * License * * The contents of this file are subject to the Jabber Open Source License * Version 1.0 (the "License"). You may not copy or use this file, in either * source code or executable form, except in compliance with the License. You * may obtain a copy of the License at http://www.jabber.com/license/ or at * http://www.opensource.org/. * * Software distributed under the License is distributed on an "AS IS" basis, * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License * for the specific language governing rights and limitations under the * License. * * Copyright (c) 2000-2001 Schuyler Heath <sheath@jabber.org> * * Acknowledgements * * Special thanks to the Jabber Open Source Contributors for their * suggestions and support of Jabber. * * -------------------------------------------------------------------------- */#include "stream.h"#include "utils.h"static char *scratch;static int scratch_sz;#define SCRATCH_INC 100#define PACKET_ADD(p,c,v) p = mt_realloc(p,sizeof(char *) * (c + 1)); p[c++] = vvoid mt_stream_free(mpstream st){ mphandler cur = st->head, tmp; log_debug(ZONE,"freeing stream %X",st); (st->cb)(NULL,st->arg); while (cur != NULL) { tmp = cur; cur = cur->next; mt_free(tmp); } if (st->buffer) mt_free(st->buffer); if (st->mp) { mt_free(st->mp->params); pool_free(st->mp->p); } mt_free(st);}char *mt_packet2str(mpacket mp){ int i; spool sp; sp = spool_new(mp->p); for (i = 0; i < mp->count; i++) { spool_add(sp,mp->params[i]); if (i + 1 < mp->count) spool_add(sp," "); } return spool_print(sp);}void mt_stream_packet(mpstream st, mpacket mp){ unsigned long trid;#ifdef PACKET_DEBUG log_debug(ZONE,"PACKET DUMP:\n%s",mt_packet2str(mp));#endif trid = atol(mt_packet_data(mp,1)); if (trid != 0) { mphandler cur = st->head, prev = NULL; while (st->closed == 0 && cur != NULL) { /* look for a packet registered handler */ if (cur->trid != trid) { prev = cur; cur = cur->next; continue; } log_debug(ZONE,"Packet handler found"); switch ((cur->cb)(mp,cur->arg)) { case r_ERR: log_error(NULL,"Error processing packet! %s",mt_packet2str(mp)); case r_DONE: if (prev == NULL) st->head = cur->next; else if ((prev->next = cur->next) == NULL) st->tail = prev; mt_free(cur); break; default: /* r_PASS is the only other result returned */ break; } log_debug(ZONE,"Packet handled"); goto done; } } /* there was no registered handler for this packet, or the stream is closing */ if ((st->cb)(mp,st->arg) == r_ERR) { log_debug(ZONE,"Default packet handler failed!"); }done: mt_free(mp->params); pool_free(mp->p);}int mt_stream_parse_msg(mpacket mp, int msg_len, char *buffer, int sz){ char *ptr, *tok, *data; const char *delim = "\r\n"; if (sz < msg_len) {/* not enough data */ log_debug(ZONE,"Split message packet %d %d",msg_len,sz); return 1; } data = pmalloc(mp->p,msg_len + 1 * sizeof(char)); memcpy(data,buffer,msg_len); data[msg_len] = '\0'; PACKET_ADD(mp->params,mp->count,data); ptr = strstr(data,"\r\n\r\n"); if (ptr == NULL) return -1; *ptr = '\0'; ptr += 4; tok = strtok(data,delim); while ((tok = strtok(NULL,delim)) != NULL) { PACKET_ADD(mp->params,mp->count,tok); } if (strcmp(ptr,"\r\n") == 0) ptr += 2; PACKET_ADD(mp->params,mp->count,ptr); return 0;}void mt_stream_parse(mpstream st, char *buffer, int sz){ mpacket mp = st->mp; char *part = buffer, **params, c; int count, i; if (mp != NULL) { params = mp->params; count = mp->count; } else { params = NULL; count = 0; } for (i = 0; i < sz; i++) { c = buffer[i]; if (c == ' ') { if (part == NULL) { log_debug(ZONE,"Parse error!"); continue; } if (mp == NULL) { pool p = pool_new(); mp = pmalloc(p,sizeof(_mpacket)); mp->p = p; } buffer[i] = '\0'; PACKET_ADD(params,count,pstrdup(mp->p,part)); part = NULL; } else if (c == '\r') { if (i + 1 == sz) break; if (params == NULL || part == NULL || mp == NULL) { log_debug(NULL,"Parse error %d %d %d",!params,!part,!mp);// abort(); if (params) mt_free(params); if (mp) pool_free(mp->p); return; } buffer[i++] = '\0'; /* NULL terminate the last parameter and increment */ PACKET_ADD(params,count,pstrdup(mp->p,part)); part = NULL; mp->params = params; mp->count = count; if (j_strcmp(params[0],"MSG") == 0) { int msg_len = atoi(params[3]); i++; /* skip the \n */ switch (mt_stream_parse_msg(mp,msg_len,buffer + i, sz - i)) { case 1: if (i != sz) part = buffer + i; st->msg_len = msg_len; goto done; case 0: i += (msg_len - 1); break; case -1: log_debug(ZONE,"Failed to parse message data! %d/%d %s",msg_len,sz - i,buffer); mt_free(params); pool_free(mp->p); return; } } /* finshed with this packet */ mt_stream_packet(st,mp); params = NULL; count = 0; mp = NULL; } else if (part == NULL) part = buffer + i; }done: if (part != NULL) { /* we only have part of this parameter */ assert(st->buffer == NULL); st->buffer = mt_strdup(part); st->bufsz = strlen(part); log_debug(ZONE,"Saved buffer %s",st->buffer); } if (mp != NULL) { assert(params && count); mp->count = count; mp->params = params; } st->mp = mp;}void mt_stream_more_msg(mpstream st, char *data, int sz){ mpacket mp = st->mp; int msg_len = st->msg_len; switch (mt_stream_parse_msg(mp,msg_len,data,sz)) { case 1: st->buffer = mt_strdup(data); st->bufsz = sz; break; case 0: st->mp = NULL; st->msg_len = 0; mt_stream_packet(st,mp); if ((sz -= msg_len) != 0) /* if we have any more data left, parse it */ mt_stream_parse(st,data + msg_len,sz); break; case -1: mt_free(mp->params); pool_free(mp->p); st->mp = NULL; st->msg_len = 0; break; }}void mt_stream_more(mpstream st, char *new, int sz){ char *data, *old = st->buffer; data = mt_malloc(sz + st->bufsz + 1); memcpy(data,old,st->bufsz); memcpy(data + st->bufsz,new,sz + 1); sz += st->bufsz; mt_free(old); st->buffer = NULL; st->bufsz = 0; if (st->msg_len) mt_stream_more_msg(st,data,sz); else mt_stream_parse(st,data,sz); mt_free(data);}void mt_stream_eat(mpstream st, char *data, int sz){ if (st->buffer) mt_stream_more(st,data,sz); else if (st->msg_len) mt_stream_more_msg(st,data,sz); else mt_stream_parse(st,data,sz);}static void mt_stream_read(mio m, int state, void *arg, char *buffer, int bufsz){ mpstream st = (mpstream) arg; switch (state) { case MIO_BUFFER: mt_stream_eat(st,buffer,bufsz); break; case MIO_CLOSED: mt_stream_free(st); break; }}static void mt_stream_connecting(mio m, int state, void *arg, char *buffer, int bufsz){ mpstream st = (mpstream) arg; switch (state) { case MIO_NEW: if (st->closed == 0) { log_debug(ZONE,"stream %X onnected",st); mio_karma(m,KARMA_INIT,KARMA_MAX,KARMA_INC,0,KARMA_PENALTY,KARMA_RESTORE); st->m = m; if (st->buffer != NULL) { /* write data which was buffered before we connected */ mio_write(m,NULL,st->buffer,st->bufsz); mt_free(st->buffer); st->buffer = NULL; st->bufsz = 0; } mio_reset(m,&mt_stream_read,(void *) st); } else { mio_close(m); } break; case MIO_CLOSED: mt_stream_free(st); break; }}void mt_stream_close(mpstream st){ st->closed = 1; if (st->m) mio_close(st->m);}void mt_stream_write(mpstream st, const char *fmt, ...){ va_list ap; int ret; while (1) { va_start(ap,fmt); ret = vsnprintf (scratch,scratch_sz,fmt,ap); va_end(ap);#ifdef PACKET_DEBUG log_debug(ZONE,"WROTE BUFFER %d/%d:%s",ret,scratch_sz,scratch);#endif if (ret == (scratch_sz - 1)) ret = -1; else if (ret > -1 && ret < scratch_sz) break; if (ret > -1) scratch_sz = ret + 1; else scratch_sz += SCRATCH_INC; scratch = mt_realloc(scratch,scratch_sz); assert(scratch != NULL); } ++st->trid; if (st->m == NULL) { assert(st->buffer == NULL); st->buffer = mt_strdup(scratch); st->bufsz = ret; } else mio_write(st->m,NULL,scratch,ret);}void mt_stream_register(mpstream st, handle cb, void *arg){ mphandler ph; ph = mt_malloc(sizeof(_mphandler)); ph->trid = st->trid; ph->cb = cb; ph->arg = arg; ph->next = NULL; if (st->head) st->tail = st->tail->next = ph; else st->head = st->tail = ph;}mpstream mt_stream_connect(char *host, int port, handle cb, void *arg){ mpstream st; st = mt_malloc(sizeof(_mpstream)); st->cb = cb; st->arg = arg; st->trid = (1 + (unsigned long) (15.0 * rand() / (RAND_MAX + 1.0))); st->m = NULL; st->head = st->tail = NULL; st->closed = 0; st->mp = NULL; st->buffer = NULL; st->bufsz = st->msg_len = 0; mio_connect(host,port,&mt_stream_connecting,(void *) st,0,NULL,NULL); return st;}void mt_stream_init(){ scratch_sz = 1024; scratch = mt_malloc(scratch_sz);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -