📄 mdtcp-srv.c
字号:
/* mdtcp-srv.c -- server tcp client-handling thread * * This file is part of 'netcast' program, released under BSD License. * (c) 2001-2002 Stanis砤w Pa秌o <staszek@nutki.com>. All rights reserved. */#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <fcntl.h>#include <sys/poll.h>#include <string.h>#include <pthread.h>#include "mdist.h"#include "mdtcp.h"#include "queue.h"#include "util.h"#include "server.h"struct cdata { struct mmsgq *tx; struct mmsgq *rx; int await; struct sockaddr_in peer;};/* listen socket - global for all threads */int sfd=-1;/* disconnect client and compress table */# define kill_client(i,c) do { close(ds[i].fd); qempty(cd[i].rx); \ qempty(cd[i].tx); ds[i]=ds[c-1]; cd[i]=cd[c-1]; i--; c--; client_count--; } while (0)void *mdtcp_srv_main(void *args) { int s,i,c,k; long t,o,e=0; struct pollfd ds[MAX_CLIENTS+1]; struct cdata cd[MAX_CLIENTS+1]; char buf[MDTCP_BUFFS]; void *cpy; struct sempack *msp,*slave=0; pthread_t child; uint8_t msgtype; int state=S_SYNC; int sync_count=0; struct mdreq *req; struct mdmsg *ans; uint32_t *rqb; msp = (struct sempack *) args; for (i=0;i<MAX_CLIENTS+1;i++) { ds[i].fd=-1; ds[i].events=POLLIN; cd[i].tx=qinit(); cd[i].rx=qinit(); cd[i].await=0; } c=1; P(msp->init); if (sfd<0) { ds[0].fd=mdtcp_open_server(0,server_port); set_nonblock(ds[0].fd); sfd=ds[0].fd; } else ds[0].fd=sfd; ds[0].events=POLLIN; while (c<=MAX_CLIENTS && client_count<wait_count) { /* Not accepting any messages from clients */ s=poll(ds,1,-1); if (s<0) crit("poll() failed"); if (s==0) { MSG(2,"poll() returned with no results!"); continue; } MSG(9,"poll: %d events",s); /* check master socket */ if (ds[0].revents) { /* presumably some new connections */ if (ds[0].revents & (POLLIN|POLLPRI)) do { k=sizeof(cd[c].peer); ds[c].fd=accept(ds[0].fd,(struct sockaddr *)&cd[c].peer,&k); k=0; if (ds[c].fd<0 && errno!=EAGAIN) MSG(2,"accept() returned with error"); if (ds[c].fd>=0) { k=1; /* Send initialization data now -- blocking, but short */ if (write(ds[c].fd,&mdini,sizeof(struct mdinit))<0 || fcntl(ds[c].fd,F_SETFL,O_NONBLOCK)<0) { MSG(3,"Error opening connection: %s",inet_ntoa(cd[c].peer.sin_addr)); close(ds[c].fd); } else { MSG(5,"New connection %d: %s",c,inet_ntoa(cd[c].peer.sin_addr)); c++; client_count++; } } } while (k && c<=MAX_CLIENTS); else MSG(2,"Error on tcp socket"); s--; ds[0].revents=0; } } ds[0].events=0; if (client_count<wait_count) { slave=spawn_server(&child,mdtcp_srv_main,0); V(slave->init); P(slave->accept); } V(msp->accept); /* all expected clients are now connected */ o=1; while (1) { if (o) { P(msp->prepare); if (slave) V(slave->prepare); /* copy header to all queues */ e=sizeof(struct mdctl); for (i=1;i<c;i++) { cpy=malloc(e); memcpy(cpy,&mdhead,e); qappend(cd[i].tx,cpy,e); ds[i].events=ds[i].events|POLLOUT; } o=0; } if (c==1) { switch(state) { case S_SYNC: if (slave) P(slave->ready); V(msp->ready); P(msp->prepare); if (slave) V(slave->prepare); case S_SEND: o=mdhead.pkt; if (slave) P(slave->finish); V(msp->finish); state=S_SYNC; if (!o) { MSG(5,"TCP server finished"); if (slave && pthread_join(child,0)<0) ERROR("pthread_join()"); if (!slave) close(sfd); return 0; } } continue; } s=poll(&ds[1],c-1,-1); if (s<0) crit("poll() failed"); if (s==0) { MSG(2,"poll() returned with no results!"); continue; } MSG(9,"poll: %d events",s); /* check all connected clients */ for (i=1;s && (i<c);i++) if (ds[i].revents) { if (ds[i].revents & (POLLERR|POLLHUP|POLLNVAL)) { MSG(5,"Closing connection %d: %s",i,inet_ntoa(cd[i].peer.sin_addr)); kill_client(i,c); } else if (ds[i].revents & (POLLIN|POLLPRI)) { /* Await is set only for repeat request */ if (cd[i].await) { e=read(ds[i].fd,buf,cd[i].await); if (e<=0) { MSG(5,"Client disconnected (r): %s",inet_ntoa(cd[i].peer.sin_addr)); kill_client(i,c); } if (e>0) { MSG(7,"New data (%ld+%ld) on conxn %d: %s",e,cd[i].rx->size,i,inet_ntoa(cd[i].peer.sin_addr)); cpy=malloc(e); memcpy(cpy,buf,e); qappend(cd[i].rx,cpy,e); cd[i].await-=e; /* Full header received */ if (cd[i].rx->size==sizeof(struct mdreq)) { if (cd[i].rx->count>1) { cpy=qflat(cd[i].rx); qappend(cd[i].rx,cpy,e); } else cpy=qfirst(cd[i].rx,&e); req = (struct mdreq *) cpy; cd[i].await=req->rqc*sizeof(uint32_t); MSG(6,"Client request for %u packets on %d: %s",req->rqc,i,inet_ntoa(cd[i].peer.sin_addr)); } /* Whole request received */ if (cd[i].await==0) { cpy = qflat(cd[i].rx); req = (struct mdreq *) cpy; rqb = (uint32_t *) (cpy + sizeof(struct mdreq)); for (t=0;t<req->rqc;t++) { ans=md_get(rqb[t]); e=ans->siz+sizeof(struct mdmsg); MSG(6,"Sending packet %u (%u bytes) on %d",rqb[t],ans->siz,i); cpy=malloc(e); memcpy(cpy,ans,e); qappend(cd[i].tx,cpy,e); } if (t>0) ds[i].events=ds[i].events|POLLOUT; } } } else { /* read message type*/ e=read(ds[i].fd,&msgtype,1); if (e<=0) { MSG(5,"Client disconnected (r): %s",inet_ntoa(cd[i].peer.sin_addr)); kill_client(i,c); } else switch (msgtype) { case CLIENT_READY: if (state==S_SYNC) { sync_count++; MSG(7,"Client SYNC %d of %d",sync_count,c-1); if (sync_count==c-1) { if (slave) P(slave->ready); V(msp->ready); P(msp->prepare); if (slave) V(slave->prepare); /* Push clients */ for (i=1;i<c;i++) { cpy=malloc(1); memcpy(cpy,"!",1); qappend(cd[i].tx,cpy,e); ds[i].events=ds[i].events|POLLOUT; } state=S_SEND; } } break; case CLIENT_START: if (state==S_SEND) { sync_count--; MSG(7,"Client START %d of %d",sync_count,c-1); if (sync_count==0) { o=mdhead.pkt; if (slave) P(slave->finish); V(msp->finish); state=S_SYNC; if (!o) { MSG(5,"TCP server finished"); if (slave && pthread_join(child,0)<0) ERROR("pthread_join()"); i=c-1; while (i>0) { shutdown(ds[i].fd,2); i--; } if (!slave) close(sfd); return 0; } } } break; case CLIENT_REQ: cd[i].await=sizeof(struct mdreq); break; default: MSG(4,"Wrong message type from %s",inet_ntoa(cd[i].peer.sin_addr)); } } } else if (ds[i].revents & POLLOUT) { if (cd[i].tx->count) { /* if there is some data to be sent, try to do it now */ cpy=qfetch(cd[i].tx,&t); memcpy(buf,cpy,t); free(cpy); e=write(ds[i].fd,buf,t); if (e<=0) { MSG(5,"Client disconnected (w): %s",inet_ntoa(cd[i].peer.sin_addr)); kill_client(i,c); } if (e!=t) { MSG(6,"Partial write: %ld out of %ld bytes sent",e,t); cpy=malloc(t-e); memcpy(cpy,buf+e,t-e); qinsert(cd[i].tx,cpy,t-e); } else { MSG(7,"Sent %ld bytes to %s",e,inet_ntoa(cd[i].peer.sin_addr)); } if (!cd[i].tx->count) ds[i].events=POLLIN; } } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -