📄 mio.c
字号:
/* write the current buffer */ ret = _mio_write_dump(cur); /* if an error occured */ if(ret == -1) { mio_close(cur); continue; /* loop on the same socket to kill it for real */ } /* if we are done writing */ else if(ret == 0) FD_CLR(cur->fd, &all_wfds); /* if we still have more to write */ else if(ret == 1) FD_SET(cur->fd, &all_wfds); } /* we may have wanted the socket closed after this operation */ if(cur->state == state_CLOSE) continue; /* loop on the same socket to kill it for real */ /* check the next socket */ cur = cur->next; } /* XXX * *tsbandit pokes jer: Why are we doing this again? i forget * yes, spin through the entire list again, * otherwise you can't write to a socket * from another socket's read call) if * there are packets to be written, wait * for a write slot */ for(cur = mio__data->master__list; cur != NULL; cur = cur->next) if(cur->queue != NULL) FD_SET(cur->fd, &all_wfds); else FD_CLR(cur->fd, &all_wfds); }}/***************************************************\* E X T E R N A L F U N C T I O N S *\***************************************************//* starts the _mio_main() loop*/void mio_init(void){ pool p; pth_attr_t attr; xmlnode io = xmlnode_get_tag(greymatter__, "io"); xmlnode karma = xmlnode_get_tag(io, "karma");#ifdef HAVE_SSL if(xmlnode_get_tag(io, "ssl") != NULL) mio_ssl_init(xmlnode_get_tag(io, "ssl"));#endif /* where to bounce HTTP requests to */ mio__bounce_uri = xmlnode_get_tag_data(io, "bounce"); if(mio__data == NULL) { register_beat(KARMA_HEARTBEAT, _karma_heartbeat, NULL); /* malloc our instance object */ p = pool_new(); mio__data = pmalloco(p, sizeof(_ios)); mio__data->p = p; mio__data->k = karma_new(p); pipe(mio__data->zzz); /* start main accept/read/write thread */ attr = pth_attr_new(); pth_attr_set(attr,PTH_ATTR_JOINABLE,FALSE);#ifdef __CYGWIN__ pth_attr_set(attr,PTH_ATTR_STACK_SIZE, 128*1024);#endif mio__data->t=pth_spawn(attr,(void*)_mio_main,NULL); pth_attr_destroy(attr); /* give time to init the signal handlers */ pth_yield(NULL); } if(karma != NULL) { mio__data->k->val = j_atoi(xmlnode_get_tag_data(karma, "init"), KARMA_INIT); mio__data->k->max = j_atoi(xmlnode_get_tag_data(karma, "max"), KARMA_MAX); mio__data->k->inc = j_atoi(xmlnode_get_tag_data(karma, "inc"), KARMA_INC); mio__data->k->dec = j_atoi(xmlnode_get_tag_data(karma, "dec"), KARMA_DEC); mio__data->k->penalty = j_atoi(xmlnode_get_tag_data(karma, "penalty"), KARMA_PENALTY); mio__data->k->restore = j_atoi(xmlnode_get_tag_data(karma, "restore"), KARMA_RESTORE); mio__data->k->reset_meter = j_atoi(xmlnode_get_tag_data(karma, "resetmeter"), KARMA_RESETMETER); } mio__data->rate_t = j_atoi(xmlnode_get_attrib(xmlnode_get_tag(io, "rate"), "time"), 0); mio__data->rate_p = j_atoi(xmlnode_get_attrib(xmlnode_get_tag(io, "rate"), "points"), 0);}/* * Cleanup function when server is shutting down, closes * all sockets, so that everything can be cleaned up * properly. */void mio_stop(void){ mio cur, mnext; log_debug2(ZONE, LOGT_CLEANUP, "MIO is shutting down"); /* no need to do anything if mio__data hasn't been used yet */ if(mio__data == NULL) return; /* flag that it is okay to exit the loop */ mio__data->shutdown = 1; /* loop each socket, and close it */ for(cur = mio__data->master__list; cur != NULL;) { mnext = cur->next; _mio_close(cur); cur = mnext; } /* signal the loop to end */ pth_abort(mio__data->t); pool_free(mio__data->p); mio__data = NULL;}/* creates a new mio object from a file descriptor*/mio mio_new(int fd, void *cb, void *arg, mio_handlers mh){ mio new = NULL; pool p = NULL; int flags = 0; if(fd <= 0) return NULL; /* create the new MIO object */ p = pool_new(); new = pmalloco(p, sizeof(_mio)); new->p = p; new->type = type_NORMAL; new->state = state_ACTIVE; new->fd = fd; new->cb = (void*)cb; new->cb_arg = arg; mio_set_handlers(new, mh); /* set the default karma values */ mio_karma2(new, mio__data->k); mio_rate(new, mio__data->rate_t, mio__data->rate_p); /* set the socket to non-blocking */ flags = fcntl(fd, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(fd, F_SETFL, flags); /* add to the select loop */ _mio_link(new); /* notify the select loop */ if(mio__data != NULL) { log_debug2(ZONE, LOGT_EXECFLOW, "sending zzz notify to the select loop in mio_new()"); /* if there has been already sent a signal, that is not yet processed, we don't * have to send this signal twice. Else we could get blocking here at the write() call * if we send really many signals, what seems to be possible for large rosters as * reported by Marco Balmer. I have yet really tried to reproduce this, but it seems * logical and I don't see where it can hurt to send only one signal. * I have also considered using pth_write() here, but as I remember, there was a reason * why a real write is used here. It would be really nice, if pth would be documented * better ... and it would be even nicer not to use pth at all ... */ if (mio__data->zzz_active <= 0) { mio__data->zzz_active++; write(mio__data->zzz[1]," ",1); log_debug2(ZONE, LOGT_EXECFLOW, "notify sent"); } } return new;}/* resets the callback function*/mio mio_reset(mio m, void *cb, void *arg){ if(m == NULL) return NULL; m->cb = cb; m->cb_arg = arg; return m;}/* * client call to close the socket */void mio_close(mio m) { if(m == NULL) return; m->state = state_CLOSE; if(mio__data != NULL) { log_debug2(ZONE, LOGT_EXECFLOW, "sending zzz notify to the select loop in mio_close()"); /* there needs to be only one pending signal */ if (mio__data->zzz_active <= 0) { mio__data->zzz_active++; write(mio__data->zzz[1]," ",1); log_debug2(ZONE, LOGT_EXECFLOW, "notify sent"); } }}/* * writes a str, or xmlnode to the client socket */void mio_write(mio m, xmlnode x, char *buffer, int len){ mio_wbq new; pool p; if(m == NULL) return; /* if there is nothing to write */ if(x == NULL && buffer == NULL) { log_debug2("mio", LOGT_IO|LOGT_STRANGE, "[%s] mio_write called without x or buffer", ZONE); return; } /* create the pool for this wbq */ if(x != NULL) p = xmlnode_pool(x); else p = pool_new(); /* create the wbq */ new = pmalloco(p, sizeof(_mio_wbq)); new->p = p; /* set the queue item type */ if(buffer != NULL) { new->type = queue_CDATA; if (len == -1) len = strlen(buffer); /* XXX more hackish code to print the stream header right on a NUL xmlnode socket */ if(m->type == type_NUL && strncmp(buffer,"<?xml ",6) == 0) { new->data = pmalloco(p,len+2); memcpy(new->data,buffer,len); memcpy((new->data + len) - 1, "/>",3); len++; /* THIS WAS DUMB, I'm just leaving it here to remind me of how dumb it was :) sprintf(new->data,"%.*s/>",len-2,buffer); */ }else{ new->data = pmalloco(p,len+1); memcpy(new->data,buffer,len); } } else { new->type = queue_XMLNODE; if((new->data = xmlnode2str(x)) == NULL) { pool_free(p); return; } len = strlen(new->data); } /* include the \0 if we're special */ if(m->type == type_NUL) { len++; } /* assign values */ new->x = x; new->cur = new->data; new->len = len; /* put at end of queue */ if(m->tail == NULL) m->queue = new; else m->tail->next = new; m->tail = new; log_debug2(ZONE, LOGT_IO, "mio_write called on x: %X buffer: %.*s", x, len, buffer); /* notify the select loop that a packet needs writing */ if(mio__data != NULL) { log_debug2(ZONE, LOGT_EXECFLOW, "sending zzz notify to the select loop in mio_write()"); /* there only needs to be one pending signal */ if (mio__data->zzz_active <= 0) { mio__data->zzz_active++; write(mio__data->zzz[1]," ",1); log_debug2(ZONE, LOGT_EXECFLOW, "notify sent"); } }}/* sets karma values*/void mio_karma(mio m, int val, int max, int inc, int dec, int penalty, int restore){ if(m == NULL) return; m->k.val = val; m->k.max = max; m->k.inc = inc; m->k.dec = dec; m->k.penalty = penalty; m->k.restore = restore;}void mio_karma2(mio m, struct karma *k){ if(m == NULL) return; karma_copy(&m->k, k);}/* sets connection rate limits*/void mio_rate(mio m, int rate_time, int max_points){ if(m == NULL || rate_time == 0) return; m->rated = 1; if(m->rate != NULL) jlimit_free(m->rate); m->rate = jlimit_new(rate_time, max_points);}/* pops the last xmlnode from the queue */xmlnode mio_cleanup(mio m){ mio_wbq cur; if(m == NULL || m->queue == NULL) return NULL; /* find the first queue item with a xmlnode attached */ for(cur = m->queue; cur != NULL;) { /* move the queue up */ m->queue = cur->next; /* set the tail pointer if needed */ if(m->queue == NULL) m->tail = NULL; /* if there is no node attached */ if(cur->x == NULL) { /* just kill this item, and move on.. * only pop xmlnodes */ mio_wbq next = m->queue; pool_free(cur->p); cur = next; continue; } /* and pop this xmlnode */ return cur->x; } /* no xmlnodes found */ return NULL;}/* * request to connect to a remote host */void mio_connect(char *host, int port, void *cb, void *cb_arg, int timeout, mio_connect_func f, mio_handlers mh){ connect_data cd = NULL; pool p = NULL; pth_attr_t attr; /* verify data */ if(host == NULL || port == 0) return; if(timeout <= 0) timeout = 30; /* default timeout */ if(f == NULL) f = MIO_RAW_CONNECT; if(mh == NULL) mh = mio_handlers_new(NULL, NULL, NULL); /* create the connect struct */ p = pool_new(); cd = pmalloco(p, sizeof(_connect_data)); cd->p = p; cd->ip = pstrdup(p, host); cd->port = port; cd->cb = cb; cd->cb_arg = cb_arg; cd->cf = f; cd->mh = mh;#ifdef WITH_IPV6 if(!strchr(host,':')) { char *temp = pmalloco(p, strlen(host)+8); strcpy(temp, "::ffff:"); strcat(temp, host); host = temp; }#endif attr = pth_attr_new(); pth_attr_set(attr,PTH_ATTR_JOINABLE,FALSE); cd->t = pth_spawn(attr, (void*)_mio_connect, (void*)cd); pth_attr_destroy(attr); register_beat(timeout, _mio_connect_timeout, (void*)cd);}/* * call to start listening with select */mio mio_listen(int port, char *listen_host, void *cb, void *arg, mio_accept_func f, mio_handlers mh){ mio new; int fd; if(f == NULL) f = MIO_RAW_ACCEPT; if(mh == NULL) mh = mio_handlers_new(NULL, NULL, NULL); mh->accept = f; log_debug2(ZONE, LOGT_IO, "mio to listen on %d [%s]",port, listen_host); /* attempt to open a listening socket */ fd = make_netsocket(port, listen_host, NETSOCKET_SERVER); /* if we got a bad fd we can't listen */ if(fd < 0) { log_alert(NULL, "mio unable to listen on %d [%s]: jabberd already running or invalid interface?", port, listen_host); return NULL; } /* start listening with a max accept queue of 10 */ if(listen(fd, 10) < 0) { log_alert(NULL, "mio unable to listen on %d [%s]: jabberd already running or invalid interface?", port, listen_host); return NULL; } /* create the sock object, and assign the values */ new = mio_new(fd, cb, arg, mh); new->type = type_LISTEN; new->ip = pstrdup(new->p, listen_host); log_debug2(ZONE, LOGT_IO, "mio starting to listen on %d [%s]", port, listen_host); return new;}mio_handlers mio_handlers_new(mio_read_func rf, mio_write_func wf, mio_parser_func pf){ pool p = pool_new(); mio_handlers new; new = pmalloco(p, sizeof(_mio_handlers)); new->p = p; /* yay! a chance to use the tertiary operator! */ new->read = rf ? rf : MIO_RAW_READ; new->write = wf ? wf : MIO_RAW_WRITE; new->parser = pf ? pf : MIO_RAW_PARSER; return new;}void mio_handlers_free(mio_handlers mh){ if(mh == NULL) return; pool_free(mh->p);}void mio_set_handlers(mio m, mio_handlers mh){ mio_handlers old; if(m == NULL || mh == NULL) return; old = m->mh; m->mh = mh; mio_handlers_free(old);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -