⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mdtcp-srv.c

📁 一个C编写网络广播的程序
💻 C
字号:
/* mdtcp-srv.c -- server tcp client-handling thread *   * This file is part of 'netcast' program, released under BSD License.  * (c) 2001-2002 Stanis砤w Pa秌o <staszek@nutki.com>. All rights reserved.    */#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <fcntl.h>#include <sys/poll.h>#include <string.h>#include <pthread.h>#include "mdist.h"#include "mdtcp.h"#include "queue.h"#include "util.h"#include "server.h"struct cdata {   struct mmsgq *tx;  struct mmsgq *rx;  int await;  struct sockaddr_in peer;};/* listen socket - global for all threads */int sfd=-1;/* disconnect client and compress table */# define kill_client(i,c) do { close(ds[i].fd); qempty(cd[i].rx); \  qempty(cd[i].tx); ds[i]=ds[c-1]; cd[i]=cd[c-1]; i--; c--; client_count--; } while (0)void *mdtcp_srv_main(void *args) {    int s,i,c,k;  long t,o,e=0;  struct pollfd ds[MAX_CLIENTS+1];  struct cdata cd[MAX_CLIENTS+1];   char buf[MDTCP_BUFFS];  void *cpy;  struct sempack *msp,*slave=0;  pthread_t child;  uint8_t msgtype;  int state=S_SYNC;  int sync_count=0;  struct mdreq *req;  struct mdmsg *ans;  uint32_t *rqb;    msp = (struct sempack *) args;    for (i=0;i<MAX_CLIENTS+1;i++) {     ds[i].fd=-1;     ds[i].events=POLLIN;    cd[i].tx=qinit();     cd[i].rx=qinit();     cd[i].await=0;  }  c=1;  P(msp->init);    if (sfd<0) {    ds[0].fd=mdtcp_open_server(0,server_port);    set_nonblock(ds[0].fd);    sfd=ds[0].fd;  } else ds[0].fd=sfd;  ds[0].events=POLLIN;  while (c<=MAX_CLIENTS && client_count<wait_count) {    /* Not accepting any messages from clients */     s=poll(ds,1,-1);    if (s<0) crit("poll() failed");    if (s==0) { MSG(2,"poll() returned with no results!"); continue; }    MSG(9,"poll: %d events",s);    /* check master socket */    if (ds[0].revents) {      /* presumably some new connections */      if (ds[0].revents & (POLLIN|POLLPRI)) do {        k=sizeof(cd[c].peer);        ds[c].fd=accept(ds[0].fd,(struct sockaddr *)&cd[c].peer,&k);        k=0;        if (ds[c].fd<0 && errno!=EAGAIN) MSG(2,"accept() returned with error");        if (ds[c].fd>=0) {                  k=1;          /* Send initialization data now -- blocking, but short */          if (write(ds[c].fd,&mdini,sizeof(struct mdinit))<0 ||               fcntl(ds[c].fd,F_SETFL,O_NONBLOCK)<0) {            MSG(3,"Error opening connection: %s",inet_ntoa(cd[c].peer.sin_addr));            close(ds[c].fd);          } else {            MSG(5,"New connection %d: %s",c,inet_ntoa(cd[c].peer.sin_addr));            c++;            client_count++;          }        }       } while (k && c<=MAX_CLIENTS); else         MSG(2,"Error on tcp socket");      s--;      ds[0].revents=0;    }  }  ds[0].events=0;  if (client_count<wait_count) {    slave=spawn_server(&child,mdtcp_srv_main,0);    V(slave->init);    P(slave->accept);  }    V(msp->accept);  /* all expected clients are now connected */  o=1;  while (1) {    if (o) {      P(msp->prepare);      if (slave) V(slave->prepare);      /* copy header to all queues */      e=sizeof(struct mdctl);       for (i=1;i<c;i++) {        cpy=malloc(e);        memcpy(cpy,&mdhead,e);        qappend(cd[i].tx,cpy,e);        ds[i].events=ds[i].events|POLLOUT;      }      o=0;    }        if (c==1) {       switch(state) {        case S_SYNC:           if (slave) P(slave->ready);          V(msp->ready);          P(msp->prepare);          if (slave) V(slave->prepare);        case S_SEND:          o=mdhead.pkt;          if (slave) P(slave->finish);          V(msp->finish);          state=S_SYNC;          if (!o) {            MSG(5,"TCP server finished");            if (slave && pthread_join(child,0)<0) ERROR("pthread_join()");            if (!slave) close(sfd);            return 0;          }      }      continue;    }                   s=poll(&ds[1],c-1,-1);    if (s<0) crit("poll() failed");    if (s==0) { MSG(2,"poll() returned with no results!"); continue; }    MSG(9,"poll: %d events",s);        /* check all connected clients */    for (i=1;s && (i<c);i++) if (ds[i].revents) {      if (ds[i].revents & (POLLERR|POLLHUP|POLLNVAL)) {        MSG(5,"Closing connection %d: %s",i,inet_ntoa(cd[i].peer.sin_addr));        kill_client(i,c);      } else if (ds[i].revents & (POLLIN|POLLPRI)) {        /* Await is set only for repeat request */        if (cd[i].await) {          e=read(ds[i].fd,buf,cd[i].await);          if (e<=0) {             MSG(5,"Client disconnected (r): %s",inet_ntoa(cd[i].peer.sin_addr));            kill_client(i,c);          }          if (e>0) {            MSG(7,"New data (%ld+%ld) on conxn %d: %s",e,cd[i].rx->size,i,inet_ntoa(cd[i].peer.sin_addr));            cpy=malloc(e);            memcpy(cpy,buf,e);            qappend(cd[i].rx,cpy,e);            cd[i].await-=e;            /* Full header received */            if (cd[i].rx->size==sizeof(struct mdreq)) {              if (cd[i].rx->count>1) {                cpy=qflat(cd[i].rx);                qappend(cd[i].rx,cpy,e);              } else cpy=qfirst(cd[i].rx,&e);              req = (struct mdreq *) cpy;              cd[i].await=req->rqc*sizeof(uint32_t);              MSG(6,"Client request for %u packets on %d: %s",req->rqc,i,inet_ntoa(cd[i].peer.sin_addr));            }            /* Whole request received */            if (cd[i].await==0) {              cpy = qflat(cd[i].rx);              req = (struct mdreq *) cpy;              rqb = (uint32_t *) (cpy + sizeof(struct mdreq));              for (t=0;t<req->rqc;t++) {                ans=md_get(rqb[t]);                e=ans->siz+sizeof(struct mdmsg);                MSG(6,"Sending packet %u (%u bytes) on %d",rqb[t],ans->siz,i);                cpy=malloc(e);                memcpy(cpy,ans,e);                qappend(cd[i].tx,cpy,e);              }              if (t>0) ds[i].events=ds[i].events|POLLOUT;            }          }                  } else {          /* read message type*/          e=read(ds[i].fd,&msgtype,1);          if (e<=0) {             MSG(5,"Client disconnected (r): %s",inet_ntoa(cd[i].peer.sin_addr));            kill_client(i,c);          } else switch (msgtype) {            case CLIENT_READY:              if (state==S_SYNC) {                sync_count++;                MSG(7,"Client SYNC %d of %d",sync_count,c-1);                if (sync_count==c-1) {                  if (slave) P(slave->ready);                  V(msp->ready);                  P(msp->prepare);                  if (slave) V(slave->prepare);                  /* Push clients */                  for (i=1;i<c;i++) {                    cpy=malloc(1);                    memcpy(cpy,"!",1);                    qappend(cd[i].tx,cpy,e);                    ds[i].events=ds[i].events|POLLOUT;                  }                  state=S_SEND;                                  }              }              break;            case CLIENT_START:              if (state==S_SEND) {                sync_count--;                MSG(7,"Client START %d of %d",sync_count,c-1);                if (sync_count==0) {                  o=mdhead.pkt;                  if (slave) P(slave->finish);                  V(msp->finish);                  state=S_SYNC;                  if (!o) {                    MSG(5,"TCP server finished");                    if (slave && pthread_join(child,0)<0) ERROR("pthread_join()");                    i=c-1;                    while (i>0) {                      shutdown(ds[i].fd,2);                      i--;                    }                    if (!slave) close(sfd);                    return 0;                  }                }              }              break;            case CLIENT_REQ:              cd[i].await=sizeof(struct mdreq);              break;            default:               MSG(4,"Wrong message type from %s",inet_ntoa(cd[i].peer.sin_addr));          }                             }      } else if (ds[i].revents & POLLOUT) {        if (cd[i].tx->count) {          /* if there is some data to be sent, try to do it now */          cpy=qfetch(cd[i].tx,&t);                    memcpy(buf,cpy,t);          free(cpy);          e=write(ds[i].fd,buf,t);          if (e<=0) {            MSG(5,"Client disconnected (w): %s",inet_ntoa(cd[i].peer.sin_addr));            kill_client(i,c);          }          if (e!=t) {            MSG(6,"Partial write: %ld out of %ld bytes sent",e,t);            cpy=malloc(t-e);            memcpy(cpy,buf+e,t-e);            qinsert(cd[i].tx,cpy,t-e);          } else {            MSG(7,"Sent %ld bytes to %s",e,inet_ntoa(cd[i].peer.sin_addr));          }          if (!cd[i].tx->count)              ds[i].events=POLLIN;        }      }    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -