📄 sessions.c
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#define FENCE_DATA 1024
int IN_LOOP = 1;
struct cachetype *BufferCacheHead;
inline char *NEW ()
{
char *result;
if (BufferCacheHead != NULL)
{
result = (char *) BufferCacheHead;
BufferCacheHead = BufferCacheHead->next;
} else
{
result = calloc (1, MAX_DATA+FENCE_DATA);
}
assert (result != NULL);
return result;
}
inline void FREE (void *p)
{
if (p == NULL) return;
((struct cachetype *)p)->next = BufferCacheHead;
BufferCacheHead = p;
}
inline void closure_cache ()
{
struct cachetype *p, *nextp;
for (p=BufferCacheHead; p; p=nextp)
{
nextp = p->next;
free (p);
}
BufferCacheHead = NULL;
}
void terminate (int sig)
{
IN_LOOP --;
}
int handle_new_connection(int sock, int type)
{
struct sockaddr_in addr;
socklen_t addrlen = sizeof (struct sockaddr_in);
int listnum, max, flags;
struct Session *head;
#ifdef SO_LINGER
struct linger ling;
#endif
int newconn = accept(sock, (struct sockaddr *)(&addr), &addrlen);
int keepalive = 1;
if (newconn < 0)
{
perror("accept");
return -1;
}
#ifdef SO_LINGER
ling.l_onoff = 1;
ling.l_linger = 0;
setsockopt (newconn, SOL_SOCKET, SO_LINGER, &ling, sizeof (struct linger));
#endif
setsockopt (newconn, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof (keepalive));
flags = fcntl(newconn, F_GETFL, 0);
flags |= O_NONBLOCK;
fcntl(newconn, F_SETFL, flags);
if (TRACKER[type].cur == TRACKER[type].max)
{
char *buf, buffer[MAX_DATA];
buf = buffer+sizeof (int);
*(unsigned char *)buf = P2P_MSG;
buf += sizeof (char);
*(unsigned short *)buf = ERR_CONNECTION_FULL;
buf += sizeof (short);
*(unsigned char *)buf = 1;
buf += sizeof (char);
*(unsigned int *)buffer = buf - buffer;
write (newconn, buffer, *(unsigned int *)buffer);
close (newconn);
return -1;
}
head = TRACKER[type].head;
max = TRACKER[type].max;
for (listnum = 0; listnum < max; listnum ++)
{
if(head[listnum].socket == 0)
{
head[listnum].socket = newconn;
head[listnum].type = type;
getpeername (newconn, (struct sockaddr *)(&addr), &addrlen);
head[listnum].host = ntohl(addr.sin_addr.s_addr);
head[listnum].port = ntohs(addr.sin_port);
head[listnum].buf = NEW ();
#ifdef __CP_SOURCE
head[listnum].totalup = 0;
#endif
head[listnum].time_sec = CurrentTime;
head[listnum].last_transferblock = CurrentTime;
FD_SET(newconn, &osocks);
if (listnum > TRACKER[type].maxid)
TRACKER[type].maxid = listnum;
break;
}
}
if (listnum >= max)
{
PDEBUG ("no space left for incoming client type %d.", type);
close (newconn);
return -1;
}
TRACKER[type].cur ++;
return (*(TRACKER[type].init)) (listnum);
}
int Clientclosure (int listnum, int type)
{
(*(TRACKER[type].closure)) (listnum);
while (TRACKER[type].maxid == listnum && TRACKER[type].head[listnum].socket == 0 && listnum > 0)
{
listnum --;
TRACKER[type].maxid --;
}
return (--TRACKER[type].cur);
}
void my_exit() __attribute__((noreturn, destructor));
#ifdef __SP_SOURCE
extern struct Channel *ProgramHash[MAX_CHANNEL];
#endif
extern void freeJobCache ();
void my_exit (int err)
{
int max, listnum, type;
struct Session *head;
int (*closure) (int);
for (type=MAX_TYPE-1; type >=0; type --)
{
max = TRACKER[type].maxid+1;
closure = TRACKER[type].closure;
if ((head = TRACKER[type].head) != NULL)
{
for (listnum=0; listnum<max; listnum++)
{
if (head[listnum].socket > 0)
(*closure) (listnum);
}
free (head);
}
#ifdef __CP_SOURCE
if (TRACKER[type].flag == FLAG_SERVER) close (TRACKER[type].sock);
#endif
#ifdef __SP_SOURCE
close (TRACKER[type].sock);
#endif
}
#ifdef __CP_SOURCE
#ifdef HAVE_TS
closure_TS ();
close (TSSOCK);
#endif
#endif
PDEBUG ("exit...\n");
freeAllChannel ();
#ifdef __SP_SOURCE
db_end ();
timer_free ();
freeAllProgram ();
freeAllOrder ();
#endif
free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
closure_cache ();
freeJobCache ();
unlink (PIDFile);
CLOSELOG;
exit (err);
}
/* check the message in p->buf, wheter is complete */
inline int checkComplete (struct Session *p)
{
int len;
if (p->socket == 0 || p->off < sizeof (int)+sizeof(char)) return 0;
len = *(unsigned int *)(p->buf+p->start);
if (len >= MAX_BLOCK_SIZE) return -1;
return (p->off >= len?len:0);
}
inline void my_memmove (char *dst, char *src, int len)
{
int i;
if (len <= 0 || dst == src) return;
for (i=0; i<len; i++)
*dst++ = *src++;
}
/* after process a message, update the buf position in p */
inline int updateBuf (struct Session *p, int len)
{
if (p->socket == 0) return 0;
p->start += len;
p->off -= len;
return 0;
}
void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks)
{
struct ServerDesc *ser = &(TRACKER[type]);
struct Session *p;
int (*process)(int listnum);
int listnum, max, this_read, this_write;
max = ser->maxid + 1;
process = ser->process; //the pointer of function process_P2PC P2PS
for (listnum = 0; listnum < max; listnum++)
{
p = &(ser->head[listnum]);
#ifdef __CP_SOURCE
if (p->flag > 0 && (CurrentTime - p->time_sec) > MAX_CONN)
{
if (reconnect (p) < 0)
{
PDEBUG("Reconnect Error.\n");
Clientclosure (listnum, type);
}
continue;
}
#endif
if (p->socket != 0 && FD_ISSET (p->socket, wsocks))
{
#ifdef __CP_SOURCE
struct sockaddr_in addr;
socklen_t addrlen = sizeof (struct sockaddr_in);
if (p->flag != 0)
{
p->flag = 0;
getpeername (p->socket, (struct sockaddr *)(&addr), &addrlen);
p->port = ntohs(addr.sin_port);
/* restore file status flags */
fcntl(p->socket, F_SETFL, p->sock_flag);
}
#endif
if ((this_write = processJobs (p)) < 0)
{
perror ("PP: Write");
Clientclosure (listnum, type);
} else
{
// PDEBUG ("Send %d to %d\n", this_write, listnum);
tmpUpBytes += this_write;
#ifdef __CP_SOURCE
p->totalup += this_write;
#endif
}
// continue;
}
if (p->socket != 0 && FD_ISSET (p->socket, socks))
{
if (p->start + p->off >= MAX_DATA-1)
{
my_memmove (p->buf, p->buf+p->start, p->off);
p->start = 0;
} else if (p->off == 0)
p->start = 0;
if ((this_read = read(p->socket, p->buf+p->start+p->off, MAX_DATA -p->start- p->off)) <= 0)
{
perror("PP: read err");
PDEBUG("socket: %d, p->start: %d. p->off: %d. \n", p->socket, p->start, p->off);
Clientclosure (listnum, type);
} else
{
p->off += this_read;
}
}
while (p->socket != 0 && (this_read = checkComplete (p)) > 0)
{
if ((*process) (listnum) == -2) break;
updateBuf (p, this_read);
}
if (p->socket != 0)
{
if (this_read < 0 || this_read > MAX_DATA)
{
PDEBUG ("Too long message, cut off, length is %d, and p->off is %d, p->start is %d.\n", this_read, p->off, p->start);
Clientclosure (listnum, type);
}
}
}
}
void process_child (void)
{
int readsocks; //?
struct Session *head;
int highsock;
fd_set socks, wsocks, esocks;
int type, listnum, max;
struct timeval tm;
#ifdef __CP_SOURCE
time_t last_handle_conn = 0;
#endif
startTime = time(NULL);
while (IN_LOOP > 0)
{
FD_ZERO(&socks);
FD_ZERO(&esocks);
FD_ZERO(&wsocks);
#ifdef __CP_SOURCE
#ifdef HAVE_TS
highsock = TSSOCK;
FD_SET(TSSOCK, &socks);
#else
highsock = 0;
#endif
#endif
CurrentTime = time (NULL);
#ifdef __SP_SOURCE
highsock = 0;
for(type=0; type<MAX_TS; type++)
{
if (tsSock[type] <= 0) continue;
if (tsSock[type] > highsock)
highsock = tsSock[type];
FD_SET(tsSock[type], &socks);
}
#endif
for (type=0; type<MAX_TYPE; type++)//type is P2PC and P2PS
{
#ifdef __CP_SOURCE
if (TRACKER[type].flag == FLAG_SERVER && highsock < TRACKER[type].sock)
highsock = TRACKER[type].sock;
if(CurrentTime - last_handle_conn >= 1)
{
last_handle_conn = CurrentTime;
}
if (TRACKER[type].flag == FLAG_SERVER) FD_SET(TRACKER[type].sock, &socks);
#endif
#ifdef __SP_SOURCE
if (highsock < TRACKER[type].sock)
highsock = TRACKER[type].sock;
FD_SET(TRACKER[type].sock, &socks);
#endif
max = TRACKER[type].maxid + 1;
head = TRACKER[type].head;
for (listnum = 0; listnum < max; listnum++)
{
if (head[listnum].socket != 0)
{
if (head[listnum].head)
FD_SET(head[listnum].socket, &wsocks);
else
FD_SET(head[listnum].socket, &socks);
if (head[listnum].socket > highsock)
highsock = head[listnum].socket;
}
}
}
tm.tv_sec = 0;
tm.tv_usec = 10000;
readsocks = select(highsock+1, &socks, &wsocks, &esocks, &tm);
if (readsocks <= 0)
goto NEXTROUND;
#ifdef __CP_SOURCE
#ifdef HAVE_TS
if (FD_ISSET(TSSOCK, &socks))
process_TS ();
if (FD_ISSET(TSSOCK, &esocks))
{
PDEBUG ("exit...\n");
exit (errno);
}
#endif
#endif
#ifdef __SP_SOURCE
for(type=0; type < MAX_TS; ++type)
{
if(tsSock[type] > 0 && FD_ISSET(tsSock[type], &socks))
process_TS2RM(type); //
}
#endif
for (type=0; type<MAX_TYPE; type++)
{
#ifdef __CP_SOURCE
if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &socks))
#endif
#ifdef __SP_SOURCE
if (FD_ISSET(TRACKER[type].sock, &socks))
#endif
handle_new_connection (TRACKER[type].sock, type);
#ifdef __CP_SOURCE
if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &esocks))
#endif
#ifdef __SP_SOURCE
if (FD_ISSET(TRACKER[type].sock, &esocks))
#endif
{
PDEBUG ("exit...\n");
exit (errno);
}
process_type (type, &socks, &wsocks, &esocks);
}
NEXTROUND:
period_process ();
}
}
/* assume the message has been transfered into p->buf */
inline void writeDATAMessage (struct Session *p, struct Channel *pc, struct JobDes *ptr)
{
ptr->len += (*(int *)(ptr->buffer));
addJob (p, pc, ptr);
}
int writeMessage (struct Session *p, struct Channel *pc, char *ptr)
{
struct JobDes *pj;
char *buffer;
int max = 0;
int new = 0;
int len = *(int *)ptr;
if ((pj = findEnoughBuffer (p, pc, len)) == NULL)
{
new = 1;
if ((pj = newJob ()) == NULL)
return -1;
}
buffer = getJobBuffer (pj, &max);
memcpy (buffer, ptr, len);
pj->len += len;
if (new)
addJob (p, pc, pj);
return 0;
}
struct Edge *newEdge (struct Channel *head, struct Session *me)
{
struct Edge *result = malloc (sizeof (struct Edge));
result->head = head;
result->me = me;
result->cnext = head->PeerHead;
head->PeerHead = result;
result->enext = me->header;
me->header = result;
return result;
}
int delEdge (struct Edge *e)
{
struct Channel *pchannel=e->head;
struct Session *psession=e->me;
struct Edge *pedge;
if (pchannel)
{
if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
else
{
for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
if (pedge && pedge->cnext)
pedge->cnext = e->cnext;
}
}
if (psession)
{
if (psession->header == e) psession->header = e->enext;
else
{
for (pedge=psession->header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
if (pedge && pedge->enext)
pedge->enext = e->enext;
}
}
if (psession->pc == pchannel) psession->pc = NULL;
free (e);
return 0;
}
void apply_session (struct Session *head, int size, void apply(struct Session *, void *), void *p)
{
int i;
for (i = 0; i < size; i++)
{
if (head[i].socket > 0) apply (&(head[i]), p);
}
}
int logto_xml (unsigned int time_interval, unsigned int tmpTime, unsigned int channelcount, unsigned int totalclient)
{
FILE *logf;
if (LOGXML == NULL || LOGXML[0] == 0 || (logf = fopen(LOGXML,"w")) == NULL)
{
PDEBUG("Couldn't open xml log %s!.\n", LOGXML);
return -1;
}
fprintf(logf, "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>\n");
#ifdef __SP_SOURCE
fprintf(logf, "<SP>\n");
#endif
#ifdef __CP_SOURCE
fprintf(logf, "<CP>\n");
#endif
fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>\n", CurrentTime-startTime);
fprintf(logf, "<CurrentIncoming>%.4f</CurrentIncoming>\n", ((float)tmpDownBytes)/1024/time_interval);
fprintf(logf, "<CurrentOutgoing>%.4f</CurrentOutgoing>\n", ((float)tmpUpBytes)/1024/time_interval);
fprintf(logf, "<AverageIncoming>%.4f</AverageIncoming>\n", ((float)totalDownBytes)/1024/tmpTime);
fprintf(logf, "<AverageOutgoing>%.4f</AverageOutgoing>\n", ((float)totalUpBytes)/1024/tmpTime);
fprintf(logf, "<ActiveChannel>%d</ActiveChannel>\n", channelcount);
fprintf(logf, "<OnlineUser>%d</OnlineUser>\n", totalclient);
fprintf(logf, "<TotalIncoming>%lld</TotalIncoming>\n", totalDownBytes);
fprintf(logf, "<TotalOutgoing>%lld</TotalOutgoing>\n", totalUpBytes);
#ifdef __SP_SOURCE
fprintf(logf, "</SP>\n");
#endif
#ifdef __SP_SOURCE
fprintf(logf, "</CP>\n");
#endif
fclose(logf);
return 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -