⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mio.c

📁 jabber server jabber server jabber server jabber server
💻 C
📖 第 1 页 / 共 3 页
字号:
    /* take it off the master__list */    _mio_unlink(m);    /* try to write what's in the queue */    if(m->queue != NULL)         ret = _mio_write_dump(m);    if(ret == 1) /* still more data, bounce it all */        if(m->cb != NULL)            (*(mio_std_cb)m->cb)(m, MIO_ERROR, m->cb);    /* notify of the close */    if(m->cb != NULL)        (*(mio_std_cb)m->cb)(m, MIO_CLOSED, m->cb_arg);    /* close the socket, and free all memory */    close(m->fd);    if(m->rated)         jlimit_free(m->rate);    pool_free(m->mh->p);    /* cleanup the write queue */    while((cur = mio_cleanup(m)) != NULL)        xmlnode_free(cur);    pool_free(m->p);    log_debug2(ZONE, LOGT_IO, "freed MIO socket");}/**  * accept an incoming connection from a listen sock * * @param m the socket on which we want to accept a connection * @return the new mio handle for the new connection */mio _mio_accept(mio m){#ifdef WITH_IPV6    struct sockaddr_in6 serv_addr;    char addr_str[INET6_ADDRSTRLEN];#else    struct sockaddr_in serv_addr;#endif    size_t addrlen = sizeof(serv_addr);    int fd;    int allow, deny;    mio new;    log_debug2(ZONE, LOGT_IO, "_mio_accept calling accept on fd #%d", m->fd);    /* pull a socket off the accept queue */    fd = (*m->mh->accept)(m, (struct sockaddr*)&serv_addr, (socklen_t*)&addrlen);    if(fd <= 0)    {        return NULL;    }#ifdef WITH_IPV6    allow = _mio_allow_check(inet_ntop(AF_INET6, &serv_addr.sin6_addr, addr_str, sizeof(addr_str)));    deny = _mio_deny_check(addr_str);#else    allow = _mio_allow_check(inet_ntoa(serv_addr.sin_addr));    deny  = _mio_deny_check(inet_ntoa(serv_addr.sin_addr));#endif    if(deny >= allow)    {#ifdef WITH_IPV6	log_warn("mio", "%s was denied access, due to the allow list of IPs", addr_str);#else        log_warn("mio", "%s was denied access, due to the allow list of IPs", inet_ntoa(serv_addr.sin_addr));#endif        close(fd);        return NULL;    }    /* make sure that we aren't rate limiting this IP */#ifdef WITH_IPV6    if(m->rated && jlimit_check(m->rate, addr_str, 1))    {	log_warn(NULL, "%s(%d) is being connection rate limited - the connection attempts from this IP exceed the rate limit defined in jabberd config", addr_str, fd);#else    if(m->rated && jlimit_check(m->rate, inet_ntoa(serv_addr.sin_addr), 1))    {        log_warn(NULL, "%s(%d) is being connection rate limited - the connection attempts from this IP exceed the rate limit defined in jabberd config", inet_ntoa(serv_addr.sin_addr), fd);#endif        close(fd);        return NULL;    }#ifdef WITH_IPV6    log_debug2(ZONE, LOGT_IO, "new socket accepted (fd: %d, ip%s, port: %d)", fd, addr_str, ntohs(serv_addr.sin6_port));#else    log_debug2(ZONE, LOGT_IO, "new socket accepted (fd: %d, ip: %s, port: %d)", fd, inet_ntoa(serv_addr.sin_addr), ntohs(serv_addr.sin_port));#endif    /* create a new sock object for this connection */    new      = mio_new(fd, m->cb, m->cb_arg, mio_handlers_new(m->mh->read, m->mh->write, m->mh->parser));#ifdef WITH_IPV6    new->ip  = pstrdup(new->p, addr_str);#else    new->ip  = pstrdup(new->p, inet_ntoa(serv_addr.sin_addr));#endif#ifdef HAVE_SSL    new->ssl = m->ssl;        /* XXX temas:  This is so messy, but I can't see a better way since I can't     *             hook into the mio_cleanup routines.  MIO still needs some     *             work.     */    pool_cleanup(new->p, _mio_ssl_cleanup, (void *)new->ssl);#endif    mio_karma2(new, &m->k);    if(m->cb != NULL)        (*(mio_std_cb)new->cb)(new, MIO_NEW, new->cb_arg);    return new;}/* raise a signal on the connecting thread to time it out */result _mio_connect_timeout(void *arg){    connect_data cd = (connect_data)arg;    if(cd->connected)    {        pool_free(cd->p);        return r_UNREG;    }    log_debug2(ZONE, LOGT_IO, "mio_connect taking too long connecting to %s, signaling to stop", cd->ip);    if(cd->t != NULL)        pth_raise(cd->t, SIGUSR2);    return r_DONE; /* loop again */}void _mio_connect(void *arg){    connect_data cd = (connect_data)arg;#ifdef WITH_IPV6    struct sockaddr_in6 sa;    struct in6_addr	*saddr;#else    struct sockaddr_in sa;    struct in_addr     *saddr;#endif    int                flag = 1,                       flags;    mio                new;    pool               p;    sigset_t           set;    sigemptyset(&set);    sigaddset(&set, SIGUSR2);    pth_sigmask(SIG_BLOCK, &set, NULL);    bzero((void*)&sa, sizeof(sa));    /* create the new mio object, can't call mio_new.. don't want it in select yet */    p           = pool_new();    new         = pmalloco(p, sizeof(_mio));    new->p      = p;    new->type   = type_NORMAL;    new->state  = state_ACTIVE;    new->ip     = pstrdup(p,cd->ip);    new->cb     = (void*)cd->cb;    new->cb_arg = cd->cb_arg;    mio_set_handlers(new, cd->mh);    /* create a socket to connect with */#ifdef WITH_IPV6    new->fd = socket(PF_INET6, SOCK_STREAM,0);#else    new->fd = socket(PF_INET, SOCK_STREAM,0);#endif    /* set socket options */    if(new->fd < 0 || setsockopt(new->fd, SOL_SOCKET, SO_REUSEADDR, (char*)&flag, sizeof(flag)) < 0) {	/* get the error message */	new->connect_errmsg = strerror(errno);	        if(cd->cb != NULL)            (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg);        cd->connected = -1;        mio_handlers_free(new->mh);        if(new->fd > 0)            close(new->fd);        pool_free(p);        return;    }    /* optionally bind to a local address */    if(xmlnode_get_tag_data(greymatter__, "io/bind") != NULL) {#ifdef WITH_IPV6	struct sockaddr_in6 sa;	char *addr_str = xmlnode_get_tag_data(greymatter__, "io/bind");	char temp_addr[INET6_ADDRSTRLEN];	struct in_addr tmp;	if (inet_pton(AF_INET, addr_str, &tmp))	{	    strcpy(temp_addr, "::ffff:");	    strcat(temp_addr, addr_str);	    addr_str = temp_addr;	}	sa.sin6_family = AF_INET6;	sa.sin6_port = 0;	sa.sin6_flowinfo = 0;	inet_pton(AF_INET6, addr_str, &sa.sin6_addr);#else        struct sockaddr_in sa;        sa.sin_family = AF_INET;        sa.sin_port   = 0;        inet_aton(xmlnode_get_tag_data(greymatter__, "io/bind"), &sa.sin_addr);#endif        bind(new->fd, (struct sockaddr*)&sa, sizeof(sa));    }#ifdef WITH_IPV6    saddr = make_addr_ipv6(cd->ip);#else    saddr = make_addr(cd->ip);#endif    if(saddr == NULL) {	new->connect_errmsg = "Could not resolve hostname or parse IP address";        if(cd->cb != NULL)            (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg);        cd->connected = -1;        mio_handlers_free(new->mh);        if(new->fd > 0)            close(new->fd);        pool_free(p);        return;    }#ifdef WITH_IPV6    sa.sin6_family = AF_INET6;    sa.sin6_port = htons(cd->port);    sa.sin6_addr = *saddr;#else    sa.sin_family = AF_INET;    sa.sin_port = htons(cd->port);    sa.sin_addr.s_addr = saddr->s_addr;#endif    log_debug2(ZONE, LOGT_IO, "calling the connect handler for mio object %X", new);    if((*cd->cf)(new, (struct sockaddr*)&sa, sizeof sa) < 0) {	/* get the error message */	new->connect_errmsg = strerror(errno);        if(cd->cb != NULL)            (*(mio_std_cb)cd->cb)(new, MIO_CLOSED, cd->cb_arg);        cd->connected = -1;        if(new->fd > 0)            close(new->fd);        mio_handlers_free(new->mh);        pool_free(p);        return;    }    new->connect_errmsg = "";    /* set the socket to non-blocking */    flags =  fcntl(new->fd, F_GETFL, 0);    flags |= O_NONBLOCK;    fcntl(new->fd, F_SETFL, flags);    /* XXX pthreads race condition.. cd->connected may be checked in the timeout, and cd freed before these calls */    /* set the default karma values */    mio_karma2(new, mio__data->k);        /* add to the select loop */    _mio_link(new);    cd->connected = 1;     /* notify the select loop */    if(mio__data != NULL) {	/* we don't have to send multiple signals */	if (mio__data->zzz_active <= 0) {	    mio__data->zzz_active++;	    pth_write(mio__data->zzz[1]," ",1);	}    }    /* notify the client that the socket is born */    if(new->cb != NULL)        (*(mio_std_cb)new->cb)(new, MIO_NEW, new->cb_arg);}/*  * main select loop thread  */void _mio_main(void *arg){    fd_set      wfds,       /* fd set for current writes   */                rfds,       /* fd set for current reads    */                all_wfds,   /* fd set for all writes       */                all_rfds;   /* fd set for all reads        */    mio         cur,                temp;        char        buf[8192]; /* max socket read buffer      */    int         maxlen,                len,                retval,                bcast=-1,                maxfd=0;#ifdef WITH_IPV6    char	addr_str[INET6_ADDRSTRLEN];#endif    log_debug2(ZONE, LOGT_INIT, "MIO is starting up");    /* init the socket junk */    maxfd = mio__data->zzz[0];    FD_ZERO(&all_wfds);    FD_ZERO(&all_rfds);    /* the optional local broadcast receiver */    if(xmlnode_get_tag(greymatter__,"io/announce") != NULL)    {        bcast = make_netsocket(j_atoi(xmlnode_get_attrib(xmlnode_get_tag(greymatter__,"io/announce"),"port"),5222),NULL,NETSOCKET_UDP);        if(bcast < 0)        {            log_notice("mio","failed to create network announce handler socket");        }else if(bcast > maxfd){            maxfd = bcast;        }        log_debug2(ZONE, LOGT_IO|LOGT_INIT, "started announcement handler");    }    /* loop forever -- will only exit when     * mio__data->master__list is NULL */    while (1)    {        /* reset the local errno */        mio__errno = 0;        rfds = all_rfds;        wfds = all_wfds;        log_debug2(ZONE, LOGT_EXECFLOW, "mio while loop top");        /* if we are closing down, exit the loop */        if(mio__data->shutdown == 1 && mio__data->master__list == NULL)            break;        /* wait for a socket event */        FD_SET(mio__data->zzz[0],&rfds); /* include our wakeup socket */        if(bcast > 0)            FD_SET(bcast,&rfds); /* optionally include our announcements socket */        retval = pth_select(maxfd+1, &rfds, &wfds, NULL, NULL);        /* if retval is -1, fd sets are undefined across all platforms */        log_debug2(ZONE, LOGT_EXECFLOW, "mio while loop, working");        /* reset maxfd, in case it changes */        maxfd=mio__data->zzz[0];        /* check our zzz */        if(FD_ISSET(mio__data->zzz[0],&rfds))        {	    log_debug2(ZONE, LOGT_EXECFLOW, "got a notify on zzz");            pth_read(mio__data->zzz[0],buf,8192);	    mio__data->zzz_active = 0;        }        /* check our pending announcements */        if(bcast > 0 && FD_ISSET(bcast,&rfds))        {#ifdef WITH_IPV6	    struct sockaddr_in6 remote_addr;#else            struct sockaddr_in remote_addr;#endif            int addrlen = sizeof(remote_addr);            xmlnode curx;            curx = xmlnode_get_firstchild(xmlnode_get_tag(greymatter__,"io/announce"));            /* XXX pth <1.4 doesn't have pth_* wrapper for recvfrom or sendto! */            len = recvfrom(bcast,buf,8192,0,(struct sockaddr*)&remote_addr,&addrlen);#ifdef WITH_IPV6            log_debug2(ZONE, LOGT_IO, "ANNOUNCER: received some data from %s: %.*s",inet_ntop(AF_INET, &remote_addr.sin6_addr, addr_str, sizeof(addr_str)),len,buf);#else            log_debug2(ZONE, LOGT_IO, "ANNOUNCER: received some data from %s: %.*s",inet_ntoa(remote_addr.sin_addr),len,buf);#endif            /* sending our data out */            for(; curx != NULL; curx = xmlnode_get_nextsibling(curx))            {                if(xmlnode_get_type(curx) != NTYPE_TAG) continue;                len = snprintf(buf,8192,"%s",xmlnode2str(curx));                log_debug2(ZONE, LOGT_IO, "announcement packet: %.*s",len,buf);                sendto(bcast,buf,len,0,(struct sockaddr*)&remote_addr,addrlen);            }        }        /* loop through the sockets, check for stuff to do */        for(cur = mio__data->master__list; cur != NULL;)        {            /* if this socket needs to close */            if(cur->state == state_CLOSE)            {                temp = cur;                cur = cur->next;                FD_CLR(temp->fd, &all_rfds);                FD_CLR(temp->fd, &all_wfds);                _mio_close(temp);                continue;            }            /* find the max fd */            if(cur->fd > maxfd)                maxfd = cur->fd;            /* if the sock is not in the read set, and has good karma and we got a non read event,             * or if we need to initialize this socket */            if(cur->k.init == 0 || (!FD_ISSET(cur->fd, &all_rfds) && cur->k.val >= 0) )            {                if(cur->k.init == 0)                {                    /* set to intialized */                    cur->k.init = 1;                    log_debug2(ZONE, LOGT_IO, "socket %d has been intialized with starting karma %d ", cur->fd, cur->k.val);                }                else                {                    /* reset the karma to restore val */                    log_debug2(ZONE, LOGT_IO, "socket %d has restore karma %d byte meter %d", cur->fd, cur->k.val, cur->k.bytes);                }                /* and make sure that they are in the read set */                FD_SET(cur->fd,&all_rfds);            }            /* pause while the rest of jabberd catches up */            pth_yield(NULL);            if(retval == -1)             { /* we can't check anything else, and be XP on all platforms here.. */                cur = cur->next;                continue;            }            /* if this socket needs to be read from */            if(FD_ISSET(cur->fd, &rfds)) /* do not read if select returned error */            {                /* new connection */                if(cur->type == type_LISTEN)                {                    mio m = _mio_accept(cur);                    /* this is needed here too */                    if(cur->fd > maxfd)                        maxfd = cur->fd;                    if(m != NULL)                    {                        FD_SET(m->fd, &all_rfds);                        if(m->fd > maxfd)                            maxfd=m->fd;                    }                    cur = cur->next;                    continue;                }                do                {                    maxlen = KARMA_READ_MAX(cur->k.val);                    if(maxlen > 8191) maxlen = 8191;                    len = (*(cur->mh->read))(cur, buf, maxlen);                    /* if we had a bad read */                    if(len == 0 && maxlen > 0)                    {                         mio_close(cur);                        continue; /* loop on the same socket to kill it for real */                    }                    else if(len < 0)                    {                        if(errno != EWOULDBLOCK && errno != EINTR &&                            errno != EAGAIN && mio__errno != EAGAIN)                         {                            /* kill this socket and move on */                            mio_close(cur);                            continue;  /* loop on the same socket to kill it for real */                        }                    }                    else                     {                        if(cur->k.dec != 0)                        { /* karma is enabled */                            karma_decrement(&cur->k, len);                            /* Check if that socket ran out of karma */                            if(cur->k.val <= 0)                            { /* ran out of karma */                                log_notice("MIO_XML_READ", "socket from %s is out of karma: traffic exceeds karma limits defined in jabberd configuration", cur->ip);                                FD_CLR(cur->fd, &all_rfds); /* this fd is being punished */                            }                        }                        buf[len] = '\0';                        log_debug2(ZONE, LOGT_IO, "MIO read from socket %d: %s", cur->fd, buf);                        (*cur->mh->parser)(cur, buf, len);                    }                }                while (mio__ssl_reread == 1);            }             /* we could have gotten a bad parse, and want to close */            if(cur->state == state_CLOSE)            { /* loop again to close the socket */                continue;            }            /* if we need to write to this socket */            if((FD_ISSET(cur->fd, &wfds) || cur->queue != NULL))            {                   int ret;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -