⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: //		Servents are the actual connections between clients. They do the handshaking,//		transfering of data and processing of GnuPackets. Each servent has one socket allocated//		to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// ------------------------------------------------#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"// -----------------------------------char *Servent::statusMsgs[]={        "NONE",		"CONNECTING",        "PROTOCOL",        "HANDSHAKE",        "CONNECTED",        "CLOSING",		"LISTENING",		"TIMEOUT",		"REFUSED",		"VERIFIED",		"ERROR",		"WAIT"};// -----------------------------------char *Servent::typeMsgs[]={		"NONE",		"ALLOCATED",        "OUT",        "IN",        "SERVER",		"CHECK",		"STREAM",        "LOOKUP"};// -----------------------------------bool	Servent::isPrivate() {	Host h = getHost();	return servMgr->isFiltered(ServFilter::F_PRIVATE,h);}// -----------------------------------bool	Servent::isAllowed(int a) {	Host h = getHost();	if (!h.isValid())		return false;	if (servMgr->isFiltered(ServFilter::F_BAN,h))		return false;	if (!servMgr->isFiltered(ServFilter::F_ALLOW,h))		return false;	return (allow&a)!=0;}// -----------------------------------void Servent::init(){	outPacketsPri.init(MAX_OUTPACKETS);	outPacketsNorm.init(MAX_OUTPACKETS);	seenIDs.init(MAX_HASH);	permAlloc = false;	sock = NULL;	reset();}// -----------------------------------void Servent::reset(bool doClose){	if (doClose)		close();	agent.clear();	sock = NULL;	outputBitrate = 0;	chanID.clear();	chanIndex = -1;	allow = ALLOW_ALL;	currPos = 0;	addMetadata = false;	pack.func = 255;	lastConnect = lastPing = lastPacket = 0;	loginPassword[0] = 0;	loginMount[0] = 0;	bytesPerSecond = 0;	outPacketsNorm.reset();	outPacketsPri.reset();	seenIDs.clearAll();	status = S_NONE;	if (!permAlloc)		type = T_NONE;}// -----------------------------------Host Servent::getHost(){	Host h(0,0);	if (sock)		h = sock->host;	return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){	if (pri)		return outPacketsPri.write(p);	else		return outPacketsNorm.write(p);}// -----------------------------------bool Servent::initServer(Host &h){	try	{		checkFree();		status = S_WAIT;		createSocket();		sock->bind(h);		thread.data = this;		thread.func = serverProc;		type = T_SERVER;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Bad server: %s",e.msg);		reset();		return false;	}	return true;}// -----------------------------------void Servent::checkFree(){	if (sock)		throw StreamException("Socket already set");	if (thread.active)		throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){	try{		checkFree();		type = T_INCOMING;		sock = s;		allow = a;		thread.data = this;		thread.func = incomingProc;		setStatus(S_PROTOCOL);		if (!sys->startThread(&thread))		{			reset();			LOG_ERROR("Unable to start incoming thread");		}	}catch(StreamException &e)	{		LOG_ERROR("Incoming error: %s",e.msg);		reset();	}}// -----------------------------------void Servent::initOutgoing(char *hostName, int port, TYPE ty){	try 	{		checkFree();	    createSocket();		type = ty;		sock->timeout = 10000;	// 10 seconds to connect 		sock->open(hostName,port);		if (!isAllowed(ALLOW_SERVENT))			throw StreamException("Servent not allowed");		thread.data = this;		thread.func = outgoingProc;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Unable to open connection to %s:%d - %s",hostName,port,e.msg);		reset();	}}// -----------------------------------void Servent::initGiv(Host &h, int idx, GnuID &id){	char hostName[64];	h.IPtoStr(hostName);	try 	{		checkFree();		Channel *ch = chanMgr->findChannelByIndex(idx);		if (!ch)			throw StreamException("No channel");		chanID = ch->info.id;		chanIndex = idx;		pushID	= id;	    createSocket();		sock->open(hostName,h.port);		if (!isAllowed(ALLOW_DATA))			throw StreamException("Servent not allowed");				sock->connect();		thread.data = this;		thread.func = givProc;		type = T_STREAM;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Giv error: 0x%x to %s: %s",idx,hostName,e.msg);		reset();	}}// -----------------------------------void Servent::createSocket(){	if (sock)		LOG_ERROR("Servent::createSocket attempt made while active");	sock = sys->createSocket();}// -----------------------------------void Servent::endThread(){		reset(true);	thread.unlock();}// -----------------------------------void Servent::abort(){	thread.active = false;	if (sock)		sock->close();}// -----------------------------------void Servent::close(){		thread.active = false;			setStatus(S_CLOSING);	if (sock)	{		sock->close();		delete sock;		sock = NULL;	}}// -----------------------------------void Servent::setStatus(STATUS s){	if (s != status)	{		status = s;		if ((s == S_CONNECTED) || (s == S_LISTENING))			lastConnect = sys->getTime();	}}// -----------------------------------void Servent::handshakeOut(){	sock->timeout = 10000;	if (servMgr->allowGnutella)	    sock->writeLine(GNU_CONNECT);	else	    sock->writeLine(GNU_PEERCONN);    	sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);    sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr());	char str[64];	servMgr->sessionID.toStr(str);	sock->writeLine("%s %s",PCX_HS_ID,str);	    sock->writeLine("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());		sock->writeLine("");	HTTP http(*sock);	http.checkResponse(200);	bool subnetValid = false;	bool versionValid = false;    while (http.nextHeader())    {		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(PCX_HS_SUBNET))			subnetValid = stricmp(arg,servMgr->network.cstr())==0;		else if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0) && (stricmp(arg+9,MAX_CONNECTVER)<=0);		}		if (type == T_LOOKUP)		{			if (http.isHeader("remote-ip"))			{				if (!servMgr->forceIP[0])				{					Host nh;					nh.fromStr(arg);					if (nh.ip != servMgr->serverHost.ip)					{						LOG_DEBUG("Got new IP: %s",arg);						servMgr->serverHost.ip = nh.ip;					}				}			}else if (http.isHeader(PCX_HS_MSG))			{				if (strcmp(servMgr->rootMsg.cstr(),arg))				{					servMgr->rootMsg.set(arg);					peercastApp->notifyMessage(ServMgr::NT_PEERCAST,arg);				}			}else if (http.isHeader(PCX_HS_DL))			{				if (servMgr->downloadURL[0]==0)				{					strcpy(servMgr->downloadURL,arg);					peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");				}			}else if (http.isHeader(PCX_HS_FULLHIT))				chanMgr->fullHitFrequency = atoi(arg);			else if (http.isHeader(PCX_HS_BCTTL))				chanMgr->broadcastTTL = atoi(arg);		}    }	if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid))		throw HTTPException(HTTP_SC_NOTFOUND,404);	if (!versionValid)		throw HTTPException(HTTP_SC_UNAUTHORIZED,401);    sock->writeLine(GNU_OK);    sock->writeLine("");}// -----------------------------------void Servent::handshakeIn(){	sock->timeout = 10000;	int osType=0;	HTTP http(*sock);	bool subnetValid = false;	bool versionValid = false;	bool diffRootVer = false;    while (http.nextHeader())    {		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(PCX_HS_SUBNET))			subnetValid = stricmp(arg,servMgr->network)==0;		else if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)			{				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0) && (stricmp(arg+9,MAX_CONNECTVER)<=0);				diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;			}		}else if (http.isHeader(PCX_HS_ID))		{			GnuID id;			id.fromStr(arg);			if (id.isSame(servMgr->sessionID))				throw StreamException("Servent loopback");		}else if (http.isHeader(PCX_HS_OS))		{			if (stricmp(arg,PCX_OS_LINUXDYN)==0)				osType = 1;			else if (stricmp(arg,PCX_OS_LINUXSTA)==0)				osType = 2;			else if (stricmp(arg,PCX_OS_WIN32)==0)				osType = 3;			else if (stricmp(arg,PCX_OS_MACOSX)==0)				osType = 4;			else if (stricmp(arg,PCX_OS_WINAMP2)==0)				osType = 5;		}    }	if ((stricmp(servMgr->network.cstr(),"peercast")!=0) && (!subnetValid))		throw HTTPException(HTTP_SC_NOTFOUND,404);	if (!versionValid)	{		throw HTTPException(HTTP_SC_UNAUTHORIZED,401);	}    sock->writeLine(GNU_OK);    sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);    sock->writeLine("%s %s",PCX_HS_SUBNET,servMgr->network.cstr());	if (servMgr->isRoot)	{		sock->writeLine("%s %d",PCX_HS_BCTTL,9);		if (diffRootVer)		{			sock->writeString(PCX_HS_DL);			switch(osType)			{				case 1:					sock->writeLine(PCX_DL_LINUXDYN);					break;				case 2:					sock->writeLine(PCX_DL_LINUXSTA);					break;				case 3:					sock->writeLine(PCX_DL_WIN32);					break;				case 4:					sock->writeLine(PCX_DL_MACOSX);					break;				case 5:					sock->writeLine(PCX_DL_WINAMP2);					break;				default:					sock->writeLine(PCX_DL_URL);					break;			}		}		sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());	}	char hostIP[64];	Host h = sock->host;	h.IPtoStr(hostIP);    sock->writeLine("Remote-IP: %s",hostIP);    sock->writeLine("");	while (http.nextHeader());}// -----------------------------------void Servent::handshakeStream(Channel *ch, bool isRaw){	sock->timeout = 10000;	HTTP http(*sock);	char idStr[64];	ch->info.id.toStr(idStr);	while (http.nextHeader())	{		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))			agent.set(arg);		if (ch->info.contentType == ChanInfo::T_MP3)			if (http.isHeader("icy-metadata"))				addMetadata = atoi(arg) > 0;		LOG_DEBUG("Stream: %s",http.cmdLine);	}	if (addMetadata && isRaw)		// winamp mp3 metadata check	{	    sock->writeLine(ICY_OK);		sock->writeLine("icy-name:%s",ch->getName());		sock->writeLine("icy-br:%d",ch->getBitrate());		sock->writeLine("icy-genre:%s",ch->info.genre.cstr());		sock->writeLine("icy-url:%s",ch->info.url.cstr());		sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);		sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval);	    sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);	}else	{	    sock->writeLine(HTTP_SC_OK);		sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT);		sock->writeLine("x-audiocast-name: %s",ch->getName());		sock->writeLine("x-audiocast-bitrate: %d",ch->getBitrate());		sock->writeLine("x-audiocast-genre: %s",ch->info.genre.cstr());		sock->writeLine("x-audiocast-description: %s",ch->info.desc.cstr());		sock->writeLine("x-audiocast-url: %s",ch->info.url.cstr());		sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);		if (isRaw)		{			switch (ch->info.contentType)			{				case ChanInfo::T_OGG:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG);					break;				case ChanInfo::T_MP3:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);					break;				case ChanInfo::T_MOV:					sock->writeLine("Accept-Ranges: bytes");					sock->writeLine("Connection: close");					sock->writeLine("Content-Length: 10000000");					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV);					break;				case ChanInfo::T_MPG:					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG);					break;			}				}else		{			sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);		}	}	    sock->writeLine("");}// -----------------------------------void Servent::handshakeGiv(Channel *ch){	sock->timeout = 10000;	char idstr[64];	pushID.toStr(idstr);    sock->writeLine("GIV %d:%s/%s",chanIndex,idstr,ch->info.name.cstr());	handshakeStream(ch,false);}// -----------------------------------void Servent::process(){	try 	{			gnuStream.init(sock);		setStatus(S_CONNECTED);		sock->timeout = 10000;		// 10 seconds		gnuStream.ping(2);		if (type != T_LOOKUP)			chanMgr->broadcastRelays(2,this);		lastPacket = lastPing = sys->getTime();		bool doneBigPing=false;		const unsigned int	abortTimeoutSecs = 60;		// abort connection after 60 secs of no activitiy		const unsigned int	packetTimeoutSecs = 30;		// ping connection after 30 secs of no activity		unsigned int currBytes=0,lastTotal=0;		unsigned int lastWait=0;		while (thread.active && sock->active())		{			if (sock->readPending())			{				lastPacket = sys->getTime();				if (gnuStream.readPacket(pack))				{					unsigned int ver = pack.id.getVersion();					char ipstr[64];					sock->host.toStr(ipstr);					GnuID routeID;					GnuStream::R_TYPE ret = GnuStream::R_PROCESS;					if ((ver < MIN_PACKETVER) || (ver >= MAX_PACKETVER))						ret = GnuStream::R_BADVERSION;					if (pack.func != GNU_FUNC_PONG)						if (servMgr->seenPacket(pack))							ret = GnuStream::R_DUPLICATE;					seenIDs.addGnuID(pack.id);					servMgr->addVersion(ver);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -