📄 io.c
字号:
/* * jabberd - Jabber Open Source Server * Copyright (c) 2002 Jeremie Miller, Thomas Muldowney, * Ryan Eatmon, Robert Norris * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA02111-1307USA */#include "sx.h"/** handler for read data */void _sx_process_read(sx_t s, sx_buf_t buf) { sx_error_t sxe; nad_t nad; char *errstring; int i; /* Note that buf->len can validly be 0 here, if we got data from the socket but the plugin didn't return anything to us (e.g. a SSL packet was split across a tcp segment boundary) */ /* parse it */ if(XML_Parse(s->expat, buf->data, buf->len, 0) == 0) { /* only report error we haven't already */ if(!s->fail) { /* parse error */ errstring = (char *) XML_ErrorString(XML_GetErrorCode(s->expat)); _sx_gen_error(sxe, SX_ERR_XML_PARSE, "XML parse error", errstring); _sx_event(s, event_ERROR, (void *) &sxe); _sx_error(s, stream_err_XML_NOT_WELL_FORMED, errstring); _sx_close(s); return; } /* !!! is this the right thing to do? we should probably set * s->fail and let the code further down handle it. */ _sx_buffer_free(buf); return; } /* done with the buffer */ _sx_buffer_free(buf); /* process completed nads */ if(s->state >= state_STREAM) while((nad = jqueue_pull(s->rnadq)) != NULL) { int plugin_error;#ifdef SX_DEBUG char *out; int len; nad_print(nad, 0, &out, &len); _sx_debug(ZONE, "completed nad: %.*s", len, out);#endif /* check for errors */ if(NAD_ENS(nad, 0) >= 0 && NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_STREAMS) && strncmp(NAD_NURI(nad, NAD_ENS(nad, 0)), uri_STREAMS, strlen(uri_STREAMS)) == 0 && NAD_ENAME_L(nad, 0) == 5 && strncmp(NAD_ENAME(nad, 0), "error", 5) == 0) { errstring = NULL; if(NAD_CDATA_L(nad, 0) > 0) { errstring = (char *) malloc(sizeof(char) * (NAD_CDATA_L(nad, 0) + 1)); sprintf(errstring, "%.*s", NAD_CDATA_L(nad, 0), NAD_CDATA(nad, 0)); } _sx_gen_error(sxe, SX_ERR_STREAM, "Stream error", errstring); _sx_event(s, event_ERROR, (void *) &sxe); if(errstring != NULL) free(errstring); _sx_state(s, state_CLOSING); nad_free(nad); break; } /* run it by the plugins */ if(_sx_chain_nad_read(s, nad) == 0) return; /* now let the plugins process the completed nad */ plugin_error = 0; if(s->env != NULL) for(i = 0; i < s->env->nplugins; i++) if(s->env->plugins[i]->process != NULL) { int plugin_ret; plugin_ret = (s->env->plugins[i]->process)(s, s->env->plugins[i], nad); if(plugin_ret == 0) { plugin_error ++; break; } } /* hand it to the app */ if ((plugin_error == 0) && (s->state < state_CLOSING)) _sx_event(s, event_PACKET, (void *) nad); } /* something went wrong, bail */ if(s->fail) { _sx_close(s); return; } /* stream was closed */ if(s->depth < 0 && s->state < state_CLOSING) { /* close the stream if necessary */ if(s->state >= state_STREAM_SENT) { jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0); s->want_write = 1; } _sx_state(s, state_CLOSING); return; }}/** we can read */int sx_can_read(sx_t s) { sx_buf_t in, out; int read, ret; assert((int) s); /* do we care? */ if(!s->want_read && s->state < state_CLOSING) return 0; /* no more thanks */ _sx_debug(ZONE, "%d ready for reading", s->tag); /* new buffer */ in = _sx_buffer_new(NULL, 1024, NULL, NULL); /* get them to read stuff */ read = _sx_event(s, event_READ, (void *) in); /* bail if something went wrong */ if(read < 0) { _sx_buffer_free(in); s->want_read = 0; s->want_write = 0; return 0; } /* EOF if we got a 0-byte read from the socket */ if(read == 0) /* they went away */ _sx_state(s, state_CLOSING); else { _sx_debug(ZONE, "passed %d read bytes", in->len); /* make a copy for processing */ out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg); /* run it by the plugins */ ret = _sx_chain_io_read(s, out); if(ret <= 0) { if(ret < 0) { /* permanent failure, its all over */ /* !!! shut down */ s->want_read = s->want_write = 0; } _sx_buffer_free(in); _sx_buffer_free(out); /* done */ if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL); return s->want_read; } _sx_buffer_free(in); _sx_debug(ZONE, "decoded read data (%d bytes): %.*s", out->len, out->len, out->data); /* into the parser with you */ _sx_process_read(s, out); } /* if we've written everything, and we're closed, then inform the app it can kill us */ if(s->want_write == 0 && s->state == state_CLOSING) { _sx_state(s, state_CLOSED); _sx_event(s, event_CLOSED, NULL); return 0; } if(s->state == state_CLOSED) return 0; if(s->want_write) _sx_event(s, event_WANT_WRITE, NULL); return s->want_read;}/** we can write */static int _sx_get_pending_write(sx_t s) { sx_buf_t in, out; int ret; assert(s != NULL); if (s->wbufpending != NULL) { /* there's already a pending buffer ready to write */ return 0; } /* get the first buffer off the queue */ in = jqueue_pull(s->wbufq); if(in == NULL) { /* if there was a write event, and something is interested, we still have to tell the plugins */ in = _sx_buffer_new(NULL, 0, NULL, NULL); } /* if there's more to write, we want to make sure we get it */ s->want_write = jqueue_size(s->wbufq); /* make a copy for processing */ out = _sx_buffer_new(in->data, in->len, in->notify, in->notify_arg); _sx_debug(ZONE, "encoding %d bytes for writing: %.*s", in->len, in->len, in->data); /* run it by the plugins */ ret = _sx_chain_io_write(s, out); if(ret <= 0) { /* TODO/!!!: Are we leaking the 'out' buffer here? How about the 'in' buffer? */ if(ret == -1) { /* temporary failure, push it back on the queue */ jqueue_push(s->wbufq, in, (s->wbufq->front != NULL) ? s->wbufq->front->priority : 0); s->want_write = 1; } else if(ret == -2) { /* permanent failure, its all over */ /* !!! shut down */ s->want_read = s->want_write = 0; return -1; } /* done */ return 0; } _sx_buffer_free(in); if (out->len == 0) /* if there's nothing to write, then we're done */ _sx_buffer_free(out); else s->wbufpending = out; return 0;}int sx_can_write(sx_t s) { sx_buf_t out; int ret, written; assert((int) s); /* do we care? */ if(!s->want_write && s->state < state_CLOSING) return 0; /* no more thanks */ _sx_debug(ZONE, "%d ready for writing", s->tag); ret = _sx_get_pending_write(s); if (ret < 0) { /* fatal error */ /* !!! shut down */ return 0; } /* if there's nothing to write, then we're done */ if(s->wbufpending == NULL) { if(s->want_read) _sx_event(s, event_WANT_READ, NULL); return s->want_write; } out = s->wbufpending; s->wbufpending = NULL; /* get the callback to do the write */ _sx_debug(ZONE, "handing app %d bytes to write", out->len); written = _sx_event(s, event_WRITE, (void *) out); if(written < 0) { /* bail if something went wrong */ _sx_buffer_free(out); s->want_read = 0; s->want_write = 0; return 0; } else if(written < out->len) { /* if not fully written, this buffer is still pending */ out->len -= written; out->data += written; s->wbufpending = out; s->want_write ++; } else { /* notify */ if(out->notify != NULL) (out->notify)(s, out->notify_arg); /* done with this */ _sx_buffer_free(out); } /* if we've written everything, and we're closed, then inform the app it can kill us */ if(s->want_write == 0 && s->state == state_CLOSING) { _sx_state(s, state_CLOSED); _sx_event(s, event_CLOSED, NULL); return 0; } if(s->state == state_CLOSED) return 0; if(s->want_read) _sx_event(s, event_WANT_READ, NULL); return s->want_write;}/** send a new nad out */int _sx_nad_write(sx_t s, nad_t nad, int elem) { char *out; int len; /* silently drop it if we're closing or closed */ if(s->state >= state_CLOSING) { log_debug(ZONE, "stream closed, dropping outgoing packet"); nad_free(nad); return 1; } /* run it through the plugins */ if(_sx_chain_nad_write(s, nad, elem) == 0) return 1; /* serialise it */ nad_print(nad, elem, &out, &len); _sx_debug(ZONE, "queueing for write: %.*s", len, out); /* ready to go */ jqueue_push(s->wbufq, _sx_buffer_new(out, len, NULL, NULL), 0); nad_free(nad); /* things to write */ s->want_write = 1; return 0;}/** app version */void sx_nad_write_elem(sx_t s, nad_t nad, int elem) { assert((int) s); assert((int) nad); if(_sx_nad_write(s, nad, elem) == 1) return; /* things to write */ s->want_write = 1; _sx_event(s, event_WANT_WRITE, NULL); if(s->want_read) _sx_event(s, event_WANT_READ, NULL);}/** send raw data out */int _sx_raw_write(sx_t s, char *buf, int len) { /* siltently drop it if we're closing or closed */ if(s->state >= state_CLOSING) { log_debug(ZONE, "stream closed, dropping outgoing raw data"); return 1; } _sx_debug(ZONE, "queuing for write: %.*s", len, buf); /* ready to go */ jqueue_push(s->wbufq, _sx_buffer_new(buf, len, NULL, NULL), 0); /* things to write */ s->want_write = 1; return 0;}/** app version */void sx_raw_write(sx_t s, char *buf, int len) { assert((int) s); assert((int) buf); assert(len); if(_sx_raw_write(s, buf, len) == 1) return; /* things to write */ s->want_write = 1; _sx_event(s, event_WANT_WRITE, NULL); if(s->want_read) _sx_event(s, event_WANT_READ, NULL);}/** close a stream */void _sx_close(sx_t s) { /* close the stream if necessary */ if(s->state >= state_STREAM_SENT) { jqueue_push(s->wbufq, _sx_buffer_new("</stream:stream>", 16, NULL, NULL), 0); s->want_write = 1; } _sx_state(s, state_CLOSING);}void sx_close(sx_t s) { assert((int) s); if(s->state >= state_CLOSING) return; if(s->state >= state_STREAM_SENT && s->state < state_CLOSING) { _sx_close(s); _sx_event(s, event_WANT_WRITE, NULL); } else { _sx_state(s, state_CLOSED); _sx_event(s, event_CLOSED, NULL); }}void sx_kill(sx_t s) { assert((int) s); _sx_state(s, state_CLOSED); _sx_event(s, event_CLOSED, NULL);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -