⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: //		Servents are the actual connections between clients. They do the handshaking,//		transfering of data and processing of GnuPackets. Each servent has one socket allocated//		to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// ------------------------------------------------// todo: make lan->yp not check firewall#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
#include "version2.h"


const int DIRECT_WRITE_TIMEOUT = 60;// -----------------------------------char *Servent::statusMsgs[]={        "NONE",		"CONNECTING",        "PROTOCOL",        "HANDSHAKE",        "CONNECTED",        "CLOSING",		"LISTENING",		"TIMEOUT",		"REFUSED",		"VERIFIED",		"ERROR",		"WAIT",
		"FREE"};// -----------------------------------char *Servent::typeMsgs[]={
		"NONE",        "INCOMING",
        "SERVER",		"RELAY",		"DIRECT",
		"COUT",
		"CIN",
		"PGNU"
};// -----------------------------------bool	Servent::isPrivate() {	Host h = getHost();	return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();}// -----------------------------------bool	Servent::isAllowed(int a) {	Host h = getHost();	if (servMgr->isFiltered(ServFilter::F_BAN,h))		return false;	return (allow&a)!=0;}
// -----------------------------------
bool	Servent::isFiltered(int f) 
{
	Host h = getHost();
	return servMgr->isFiltered(f,h);
}
// -----------------------------------Servent::Servent(int index):outPacketsPri(MAX_OUTPACKETS),outPacketsNorm(MAX_OUTPACKETS),seenIDs(MAX_HASH),serventIndex(index),sock(NULL),next(NULL){	reset();}// -----------------------------------Servent::~Servent(){		}// -----------------------------------void	Servent::kill() {
	thread.active = false;
		
	setStatus(S_CLOSING);

	if (pcpStream)
	{
		PCPStream *pcp = pcpStream;
		pcpStream = NULL;
		pcp->kill();
		delete pcp;
	}


	if (sock)
	{
		sock->close();
		delete sock;
		sock = NULL;
	}

	if (pushSock)
	{
		pushSock->close();
		delete pushSock;
		pushSock = NULL;
	}
//	thread.unlock();

	if (type != T_SERVER)	{		reset();		setStatus(S_FREE);	}
}// -----------------------------------void	Servent::abort() {	thread.active = false;	if (sock)
	{		sock->close();
	}}// -----------------------------------void Servent::reset(){

	remoteID.clear();

	servPort = 0;

	pcpStream = NULL;

	flowControl = false;	networkID.clear();
	chanID.clear();

	outputProtocol = ChanInfo::SP_UNKNOWN;
	agent.clear();	sock = NULL;	allow = ALLOW_ALL;	syncPos = 0;	addMetadata = false;
	nsSwitchNum = 0;
	pack.func = 255;	lastConnect = lastPing = lastPacket = 0;	loginPassword[0] = 0;	loginMount[0] = 0;	bytesPerSecond = 0;	priorityConnect = false;
	pushSock = NULL;
	sendHeader = true;	outPacketsNorm.reset();	outPacketsPri.reset();	seenIDs.clear();	status = S_NONE;	type = T_NONE;}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
{

	if  (	   (type == t) 
			&& (isConnected())
			&& (!cid.isSet() || chanID.isSame(cid))
			&& (!sid.isSet() || !sid.isSame(remoteID))
			&& (pcpStream != NULL)
		)
	{
		return pcpStream->sendPacket(pack,did);
	}
	return false;
}


// -----------------------------------
bool Servent::acceptGIV(ClientSocket *givSock)
{
	if (!pushSock)
	{
		pushSock = givSock;
		return true;
	}else
		return false;
}
// -----------------------------------Host Servent::getHost(){	Host h(0,0);	if (sock)		h = sock->host;	return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){	lock.on();	bool r=false;	if (pri)		r = outPacketsPri.write(p);	else	{		if (servMgr->useFlowControl)		{			int per = outPacketsNorm.percentFull();			if (per > 50)				flowControl = true;			else if (per < 25)				flowControl = false;		}		bool send=true;		if (flowControl)		{			// if in flowcontrol, only allow packets with less of a hop count than already in queue			if (p.hops >= outPacketsNorm.findMinHop())				send = false;		}		if (send)			r = outPacketsNorm.write(p);	}	lock.off();	return r;}// -----------------------------------bool Servent::initServer(Host &h){	try	{		checkFree();		status = S_WAIT;		createSocket();		sock->bind(h);		thread.data = this;		thread.func = serverProc;

		type = T_SERVER;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Bad server: %s",e.msg);		kill();		return false;	}	return true;}// -----------------------------------void Servent::checkFree(){	if (sock)		throw StreamException("Socket already set");	if (thread.active)		throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){	try{		checkFree();		type = T_INCOMING;		sock = s;		allow = a;		thread.data = this;		thread.func = incomingProc;		setStatus(S_PROTOCOL);

		char ipStr[64];
		sock->host.toStr(ipStr);
		LOG_DEBUG("Incoming from %s",ipStr);
		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		//LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);		//servMgr->shutdownTimer = 1;  	
		kill();

		LOG_ERROR("INCOMING FAILED: %s",e.msg);
	}}// -----------------------------------void Servent::initOutgoing(TYPE ty){	try 	{		checkFree();		type = ty;		thread.data = this;		thread.func = outgoingProc;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Unable to start outgoing: %s",e.msg);		kill();	}}
// -----------------------------------
void Servent::initPCP(Host &rh)
{
	char ipStr[64];
	rh.toStr(ipStr);
	try 
	{
		checkFree();

	    createSocket();

		type = T_COUT;

		sock->open(rh);

		if (!isAllowed(ALLOW_NETWORK))
			throw StreamException("Servent not allowed");

		thread.data = this;
		thread.func = outgoingProc;

		LOG_DEBUG("Outgoing to %s",ipStr);

		if (!sys->startThread(&thread))
			throw StreamException("Can`t start thread");

	}catch(StreamException &e)
	{
		LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
		kill();
	}
}

#if 0
// -----------------------------------
void	Servent::initChannelFetch(Host &host)
{
	type = T_STREAM;

	char ipStr[64];
	host.toStr(ipStr);

	checkFree();
	 
	createSocket();
		
	sock->open(host);

		
	if (!isAllowed(ALLOW_DATA))	
		throw StreamException("Servent not allowed");
		
	sock->connect();
}
#endif

// -----------------------------------void Servent::initGIV(Host &h, GnuID &id){	char ipStr[64];	h.toStr(ipStr);	try 	{		checkFree();		givID = id;	    createSocket();		sock->open(h);		if (!isAllowed(ALLOW_NETWORK))			throw StreamException("Servent not allowed");				sock->connect();		thread.data = this;		thread.func = givProc;		type = T_RELAY;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);		kill();	}}// -----------------------------------void Servent::createSocket(){	if (sock)		LOG_ERROR("Servent::createSocket attempt made while active");	sock = sys->createSocket();}// -----------------------------------void Servent::setStatus(STATUS s){	if (s != status)	{		status = s;		if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))			lastConnect = sys->getTime();	}}
// -----------------------------------void Servent::handshakeOut(){    sock->writeLine(GNU_PEERCONN);	char str[64];    	sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);    sock->writeLineF("%s %d",PCX_HS_PCP,1);
	if (priorityConnect)	    sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);		if (networkID.isSet())	{		networkID.toStr(str);		sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);	}	servMgr->sessionID.toStr(str);	sock->writeLineF("%s %s",PCX_HS_ID,str);	    sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());		sock->writeLine("");	HTTP http(*sock);
	int r = http.readResponse();

	if (r != 200)
	{
		LOG_ERROR("Expected 200, got %d",r);
		throw StreamException("Unexpected HTTP response");
	}
	bool versionValid = false;	GnuID clientID;	clientID.clear();    while (http.nextHeader())    {		LOG_DEBUG(http.cmdLine);		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);		}else if (http.isHeader(PCX_HS_NETWORKID))			clientID.fromStr(arg);    }
	if (!clientID.isSame(networkID))		throw HTTPException(HTTP_SC_UNAVAILABLE,503);	if (!versionValid)		throw HTTPException(HTTP_SC_UNAUTHORIZED,401);    sock->writeLine(GNU_OK);    sock->writeLine("");

}

// -----------------------------------
void Servent::processOutChannel()
{
}


// -----------------------------------void Servent::handshakeIn(){	int osType=0;	HTTP http(*sock);	bool versionValid = false;	bool diffRootVer = false;
	GnuID clientID;	clientID.clear();    while (http.nextHeader())    {		LOG_DEBUG("%s",http.cmdLine);		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)			{				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);				diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;			}		}else if (http.isHeader(PCX_HS_NETWORKID))		{			clientID.fromStr(arg);		}else if (http.isHeader(PCX_HS_PRIORITY))		{			priorityConnect = atoi(arg)!=0;
		}else if (http.isHeader(PCX_HS_ID))		{			GnuID id;			id.fromStr(arg);			if (id.isSame(servMgr->sessionID))				throw StreamException("Servent loopback");		}else if (http.isHeader(PCX_HS_OS))		{			if (stricmp(arg,PCX_OS_LINUX)==0)				osType = 1;			else if (stricmp(arg,PCX_OS_WIN32)==0)				osType = 2;			else if (stricmp(arg,PCX_OS_MACOSX)==0)				osType = 3;			else if (stricmp(arg,PCX_OS_WINAMP2)==0)				osType = 4;		}    }

	if (!clientID.isSame(networkID))		throw HTTPException(HTTP_SC_UNAVAILABLE,503);	// if this is a priority connection and all incoming connections 	// are full then kill an old connection to make room. Otherwise reject connection.	//if (!priorityConnect)	{		if (!isPrivate())			if (servMgr->pubInFull())				throw HTTPException(HTTP_SC_UNAVAILABLE,503);	}	if (!versionValid)		throw HTTPException(HTTP_SC_FORBIDDEN,403);    sock->writeLine(GNU_OK);    sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);	if (networkID.isSet())	{		char idStr[64];		networkID.toStr(idStr);		sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);	}	if (servMgr->isRoot)	{		sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);		sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);		sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);		sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);		//sock->writeLine("%s %d",PCX_HS_FULLHIT,2);		if (diffRootVer)		{			sock->writeString(PCX_HS_DL);			sock->writeLine(PCX_DL_URL);		}		sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());	}	char hostIP[64];	Host h = sock->host;	h.IPtoStr(hostIP);    sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);    sock->writeLine("");	while (http.nextHeader());
}
// -----------------------------------
bool	Servent::pingHost(Host &rhost,GnuID &rsid)
{
	char ipstr[64];
	rhost.toStr(ipstr);
	LOG_DEBUG("Ping host %s: trying..",ipstr);
	ClientSocket *s=NULL;
	bool hostOK=false;
	try
	{
		s = sys->createSocket();
		if (!s)
			return false;
		else
		{

			s->setReadTimeout(15000);
			s->setWriteTimeout(15000);
			s->open(rhost);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -