📄 mio.c
字号:
/* take it off the master__list */ _mio_unlink(m); /* try to write what's in the queue */ if(m->queue != NULL) ret = _mio_write_dump(m); if(ret == 1) /* still more data, bounce it all */ if(m->cb != NULL) (*(mio_std_cb)m->cb)(m, MIO_ERROR, m->cb); /* notify of the close */ if(m->cb != NULL) (*(mio_std_cb)m->cb)(m, MIO_CLOSED, m->cb_arg); /* close the socket, and free all memory */ close(m->fd); if(m->rated) jlimit_free(m->rate); pool_free(m->mh->p); /* cleanup the write queue */ while((cur = mio_cleanup(m)) != NULL) xmlnode_free(cur); pool_free(m->p); log_debug2(ZONE, LOGT_IO, "freed MIO socket");}/** * accept an incoming connection from a listen sock * * @param m the socket on which we want to accept a connection * @return the new mio handle for the new connection */mio _mio_accept(mio m){#ifdef WITH_IPV6 struct sockaddr_in6 serv_addr; char addr_str[INET6_ADDRSTRLEN];#else struct sockaddr_in serv_addr;#endif size_t addrlen = sizeof(serv_addr); int fd; int allow, deny; mio new; log_debug2(ZONE, LOGT_IO, "_mio_accept calling accept on fd #%d", m->fd); /* pull a socket off the accept queue */ fd = (*m->mh->accept)(m, (struct sockaddr*)&serv_addr, (socklen_t*)&addrlen); if(fd <= 0) { return NULL; }#ifdef WITH_IPV6 allow = _mio_allow_check(inet_ntop(AF_INET6, &serv_addr.sin6_addr, addr_str, sizeof(addr_str))); deny = _mio_deny_check(addr_str);#else allow = _mio_allow_check(inet_ntoa(serv_addr.sin_addr)); deny = _mio_deny_check(inet_ntoa(serv_addr.sin_addr));#endif if(deny >= allow) {#ifdef WITH_IPV6 log_warn("mio", "%s was denied access, due to the allow list of IPs", addr_str);#else log_warn("mio", "%s was denied access, due to the allow list of IPs", inet_ntoa(serv_addr.sin_addr));#endif close(fd); return NULL; } /* make sure that we aren't rate limiting this IP */#ifdef WITH_IPV6 if(m->rated && jlimit_check(m->rate, addr_str, 1)) { log_warn(NULL, "%s(%d) is being connection rate limited - the connection attempts from this IP exceed the rate limit defined in jabberd config", addr_str, fd);#else if(m->rated && jlimit_check(m->rate, inet_ntoa(serv_addr.sin_addr), 1)) { log_warn(NULL, "%s(%d) is being connection rate limited - the connection attempts from this IP exceed the rate limit defined in jabberd config", inet_ntoa(serv_addr.sin_addr), fd);#endif close(fd); return NULL; }#ifdef WITH_IPV6 log_debug2(ZONE, LOGT_IO, "new socket accepted (fd: %d, ip%s, port: %d)", fd, addr_str, ntohs(serv_addr.sin6_port));#else log_debug2(ZONE, LOGT_IO, "new socket accepted (fd: %d, ip: %s, port: %d)", fd, inet_ntoa(serv_addr.sin_addr), ntohs(serv_addr.sin_port));#endif /* create a new sock object for this connection */ new = mio_new(fd, m->cb, m->cb_arg, mio_handlers_new(m->mh->read, m->mh->write, m->mh->parser));#ifdef WITH_IPV6 new->ip = pstrdup(new->p, addr_str);#else new->ip = pstrdup(new->p, inet_ntoa(serv_addr.sin_addr));#endif#ifdef HAVE_SSL new->ssl = m->ssl; /* XXX temas: This is so messy, but I can't see a better way since I can't * hook into the mio_cleanup routines. MIO still needs some * work. */ pool_cleanup(new->p, _mio_ssl_cleanup, (void *)new->ssl);#endif mio_karma2(new, &m->k); if(m->cb != NULL) (*(mio_std_cb)new->cb)(new, MIO_NEW, new->cb_arg); return new;}/* raise a signal on the connecting thread to time it out */result _mio_connect_timeout(void *arg){ connect_data cd = (connect_data)arg; if(cd->connected) { pool_free(cd->p); return r_UNREG; } log_debug2(ZONE, LOGT_IO, "mio_connect taking too long connecting to %s, signaling to stop", cd->ip); if(cd->t != NULL) pth_raise(cd->t, SIGUSR2); return r_DONE; /* loop again */}void _mio_connect(void *arg){ connect_data cd = (connect_data)arg;#ifdef WITH_IPV6 struct sockaddr_in6 sa; struct in6_addr *saddr;#else struct sockaddr_in sa; struct in_addr *saddr;#endif int flag = 1, flags; mio new; pool p; sigset_t set; sigemptyset(&set); sigaddset(&set, SIGUSR2); pth_sigmask(SIG_BLOCK, &set, NULL); bzero((void*)&sa, sizeof(sa)); /* create the new mio object, can't call mio_new.. don't want it in select yet */ p = pool_new(); new = pmalloco(p, sizeof(_mio)); new->p = p; new->type = type_NORMAL; new->state = state_ACTIVE; new->ip = pstrdup(p,cd->ip); new->cb = (void*)cd->cb; new->cb_arg = cd->cb_arg; mio_set_handlers(new, cd->mh); /* create a socket to connect with */#ifdef WITH_IPV6 new->fd = socket(PF_INET6, SOCK_STREAM,0);#else new->fd = socket(PF_INET, SOCK_STREAM,0);#endif /* set socket options */ if(new->fd < 0 || setsockopt(new->fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(flag)) < 0) { /* get the error message */ new->connect_errmsg = strerror(errno); if(cd->cb != NULL) (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg); cd->connected = -1; mio_handlers_free(new->mh); if(new->fd > 0) close(new->fd); pool_free(p); return; } /* optionally bind to a local address */ if(xmlnode_get_tag_data(greymatter__, "io/bind") != NULL) {#ifdef WITH_IPV6 struct sockaddr_in6 sa; char *addr_str = xmlnode_get_tag_data(greymatter__, "io/bind"); char temp_addr[INET6_ADDRSTRLEN]; struct in_addr tmp; if (inet_pton(AF_INET, addr_str, &tmp)) { strcpy(temp_addr, "::ffff:"); strcat(temp_addr, addr_str); addr_str = temp_addr; } sa.sin6_family = AF_INET6; sa.sin6_port = 0; sa.sin6_flowinfo = 0; inet_pton(AF_INET6, addr_str, &sa.sin6_addr);#else struct sockaddr_in sa; sa.sin_family = AF_INET; sa.sin_port = 0; inet_aton(xmlnode_get_tag_data(greymatter__, "io/bind"), &sa.sin_addr);#endif bind(new->fd, (struct sockaddr*)&sa, sizeof(sa)); }#ifdef WITH_IPV6 saddr = make_addr_ipv6(cd->ip);#else saddr = make_addr(cd->ip);#endif if(saddr == NULL) { new->connect_errmsg = "Could not resolve hostname or parse IP address"; if(cd->cb != NULL) (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg); cd->connected = -1; mio_handlers_free(new->mh); if(new->fd > 0) close(new->fd); pool_free(p); return; }#ifdef WITH_IPV6 sa.sin6_family = AF_INET6; sa.sin6_port = htons(cd->port); sa.sin6_addr = *saddr;#else sa.sin_family = AF_INET; sa.sin_port = htons(cd->port); sa.sin_addr.s_addr = saddr->s_addr;#endif log_debug2(ZONE, LOGT_IO, "calling the connect handler for mio object %X", new); if((*cd->cf)(new, (struct sockaddr*)&sa, sizeof sa) < 0) { /* get the error message */ new->connect_errmsg = strerror(errno); if(cd->cb != NULL) (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg); cd->connected = -1; if(new->fd > 0) close(new->fd); mio_handlers_free(new->mh); pool_free(p); return; } new->connect_errmsg = ""; /* set the socket to non-blocking */ flags = fcntl(new->fd, F_GETFL, 0); flags |= O_NONBLOCK; fcntl(new->fd, F_SETFL, flags); /* XXX pthreads race condition.. cd->connected may be checked in the timeout, and cd freed before these calls */ /* set the default karma values */ mio_karma2(new, mio__data->k); /* add to the select loop */ _mio_link(new); cd->connected = 1; /* notify the select loop */ if(mio__data != NULL) { /* we don't have to send multiple signals */ if (mio__data->zzz_active <= 0) { mio__data->zzz_active++; pth_write(mio__data->zzz[1]," ",1); } } /* notify the client that the socket is born */ if(new->cb != NULL) (*(mio_std_cb)new->cb)(new, MIO_NEW, new->cb_arg);}/* * main select loop thread */void _mio_main(void *arg){ fd_set wfds, /* fd set for current writes */ rfds, /* fd set for current reads */ all_wfds, /* fd set for all writes */ all_rfds; /* fd set for all reads */ mio cur, temp; char buf[8192]; /* max socket read buffer */ int maxlen, len, retval, bcast=-1, maxfd=0;#ifdef WITH_IPV6 char addr_str[INET6_ADDRSTRLEN];#endif log_debug2(ZONE, LOGT_INIT, "MIO is starting up"); /* init the socket junk */ maxfd = mio__data->zzz[0]; FD_ZERO(&all_wfds); FD_ZERO(&all_rfds); /* the optional local broadcast receiver */ if(xmlnode_get_tag(greymatter__,"io/announce") != NULL) { bcast = make_netsocket(j_atoi(xmlnode_get_attrib(xmlnode_get_tag(greymatter__,"io/announce"),"port"),5222),NULL,NETSOCKET_UDP); if(bcast < 0) { log_notice("mio","failed to create network announce handler socket"); }else if(bcast > maxfd){ maxfd = bcast; } log_debug2(ZONE, LOGT_IO|LOGT_INIT, "started announcement handler"); } /* loop forever -- will only exit when * mio__data->master__list is NULL */ while (1) { /* reset the local errno */ mio__errno = 0; rfds = all_rfds; wfds = all_wfds; log_debug2(ZONE, LOGT_EXECFLOW, "mio while loop top"); /* if we are closing down, exit the loop */ if(mio__data->shutdown == 1 && mio__data->master__list == NULL) break; /* wait for a socket event */ FD_SET(mio__data->zzz[0],&rfds); /* include our wakeup socket */ if(bcast > 0) FD_SET(bcast,&rfds); /* optionally include our announcements socket */ retval = pth_select(maxfd+1, &rfds, &wfds, NULL, NULL); /* if retval is -1, fd sets are undefined across all platforms */ log_debug2(ZONE, LOGT_EXECFLOW, "mio while loop, working"); /* reset maxfd, in case it changes */ maxfd=mio__data->zzz[0]; /* check our zzz */ if(FD_ISSET(mio__data->zzz[0],&rfds)) { log_debug2(ZONE, LOGT_EXECFLOW, "got a notify on zzz"); pth_read(mio__data->zzz[0],buf,8192); mio__data->zzz_active = 0; } /* check our pending announcements */ if(bcast > 0 && FD_ISSET(bcast,&rfds)) {#ifdef WITH_IPV6 struct sockaddr_in6 remote_addr;#else struct sockaddr_in remote_addr;#endif int addrlen = sizeof(remote_addr); xmlnode curx; curx = xmlnode_get_firstchild(xmlnode_get_tag(greymatter__,"io/announce")); /* XXX pth <1.4 doesn't have pth_* wrapper for recvfrom or sendto! */ len = recvfrom(bcast,buf,8192,0,(struct sockaddr*)&remote_addr,&addrlen);#ifdef WITH_IPV6 log_debug2(ZONE, LOGT_IO, "ANNOUNCER: received some data from %s: %.*s",inet_ntop(AF_INET, &remote_addr.sin6_addr, addr_str, sizeof(addr_str)),len,buf);#else log_debug2(ZONE, LOGT_IO, "ANNOUNCER: received some data from %s: %.*s",inet_ntoa(remote_addr.sin_addr),len,buf);#endif /* sending our data out */ for(; curx != NULL; curx = xmlnode_get_nextsibling(curx)) { if(xmlnode_get_type(curx) != NTYPE_TAG) continue; len = snprintf(buf,8192,"%s",xmlnode2str(curx)); log_debug2(ZONE, LOGT_IO, "announcement packet: %.*s",len,buf); sendto(bcast,buf,len,0,(struct sockaddr*)&remote_addr,addrlen); } } /* loop through the sockets, check for stuff to do */ for(cur = mio__data->master__list; cur != NULL;) { /* if this socket needs to close */ if(cur->state == state_CLOSE) { temp = cur; cur = cur->next; FD_CLR(temp->fd, &all_rfds); FD_CLR(temp->fd, &all_wfds); _mio_close(temp); continue; } /* find the max fd */ if(cur->fd > maxfd) maxfd = cur->fd; /* if the sock is not in the read set, and has good karma and we got a non read event, * or if we need to initialize this socket */ if(cur->k.init == 0 || (!FD_ISSET(cur->fd, &all_rfds) && cur->k.val >= 0) ) { if(cur->k.init == 0) { /* set to intialized */ cur->k.init = 1; log_debug2(ZONE, LOGT_IO, "socket %d has been intialized with starting karma %d ", cur->fd, cur->k.val); } else { /* reset the karma to restore val */ log_debug2(ZONE, LOGT_IO, "socket %d has restore karma %d byte meter %d", cur->fd, cur->k.val, cur->k.bytes); } /* and make sure that they are in the read set */ FD_SET(cur->fd,&all_rfds); } /* pause while the rest of jabberd catches up */ pth_yield(NULL); if(retval == -1) { /* we can't check anything else, and be XP on all platforms here.. */ cur = cur->next; continue; } /* if this socket needs to be read from */ if(FD_ISSET(cur->fd, &rfds)) /* do not read if select returned error */ { /* new connection */ if(cur->type == type_LISTEN) { mio m = _mio_accept(cur); /* this is needed here too */ if(cur->fd > maxfd) maxfd = cur->fd; if(m != NULL) { FD_SET(m->fd, &all_rfds); if(m->fd > maxfd) maxfd=m->fd; } cur = cur->next; continue; } do { maxlen = KARMA_READ_MAX(cur->k.val); if(maxlen > 8191) maxlen = 8191; len = (*(cur->mh->read))(cur, buf, maxlen); /* if we had a bad read */ if(len == 0 && maxlen > 0) { mio_close(cur); continue; /* loop on the same socket to kill it for real */ } else if(len < 0) { if(errno != EWOULDBLOCK && errno != EINTR && errno != EAGAIN && mio__errno != EAGAIN) { /* kill this socket and move on */ mio_close(cur); continue; /* loop on the same socket to kill it for real */ } } else { if(cur->k.dec != 0) { /* karma is enabled */ karma_decrement(&cur->k, len); /* Check if that socket ran out of karma */ if(cur->k.val <= 0) { /* ran out of karma */ log_notice("MIO_XML_READ", "socket from %s is out of karma: traffic exceeds karma limits defined in jabberd configuration", cur->ip); FD_CLR(cur->fd, &all_rfds); /* this fd is being punished */ } } buf[len] = '\0'; log_debug2(ZONE, LOGT_IO, "MIO read from socket %d: %s", cur->fd, buf); (*cur->mh->parser)(cur, buf, len); } } while (mio__ssl_reread == 1); } /* we could have gotten a bad parse, and want to close */ if(cur->state == state_CLOSE) { /* loop again to close the socket */ continue; } /* if we need to write to this socket */ if((FD_ISSET(cur->fd, &wfds) || cur->queue != NULL)) { int ret;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -