⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 4 页
字号:
// ------------------------------------------------// File : servent.cpp// Date: 4-apr-2002// Author: giles// Desc: //		Servents are the actual connections between clients. They do the handshaking,//		transfering of data and processing of GnuPackets. Each servent has one socket allocated//		to it on connect, it uses this to transfer all of its data.//// (c) 2002 peercast.org// ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// ------------------------------------------------#include <stdlib.h>#include "servent.h"#include "sys.h"#include "gnutella.h"#include "xml.h"#include "html.h"#include "http.h"#include "stats.h"#include "servmgr.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
// -----------------------------------char *Servent::statusMsgs[]={        "NONE",		"CONNECTING",        "PROTOCOL",        "HANDSHAKE",        "CONNECTED",        "CLOSING",		"LISTENING",		"TIMEOUT",		"REFUSED",		"VERIFIED",		"ERROR",		"WAIT"};// -----------------------------------char *Servent::typeMsgs[]={
		"NONE",        "INCOMING",
        "SERVER",		"STREAM",		"COUT",
		"CIN",
		"PGNU"
};// -----------------------------------bool	Servent::isPrivate() {	Host h = getHost();	return servMgr->isFiltered(ServFilter::F_PRIVATE,h);}// -----------------------------------bool	Servent::isAllowed(int a) {	Host h = getHost();	if (!h.isValid())		return false;	if (servMgr->isFiltered(ServFilter::F_BAN,h))		return false;	if (!servMgr->isFiltered(ServFilter::F_NETWORK,h))		return false;	return (allow&a)!=0;}// -----------------------------------Servent::Servent(int id):outPacketsPri(MAX_OUTPACKETS),outPacketsNorm(MAX_OUTPACKETS),seenIDs(MAX_HASH),serventID(id),sock(NULL),next(NULL){	reset();}// -----------------------------------Servent::~Servent(){		}// -----------------------------------void	Servent::kill() {

	thread.unlock();	close();
	if (type != T_SERVER)	{		type = T_NONE;		setStatus(S_DEAD);	}
}// -----------------------------------void	Servent::abort() {	thread.active = false;	if (sock)		sock->close();}// -----------------------------------void Servent::reset(){

	remoteID.clear();


	pcpStream.init();

	flowControl = false;	networkID.clear();
	chanID.clear();

	outputProtocol = ChanInfo::SP_UNKNOWN;
	agent.clear();	sock = NULL;	allow = ALLOW_ALL;	syncPos = 0;	addMetadata = false;
	nsSwitchNum = 0;
	pack.func = 255;	lastConnect = lastPing = lastPacket = 0;	loginPassword[0] = 0;	loginMount[0] = 0;	bytesPerSecond = 0;	priorityConnect = false;
	pushSock = NULL;	outPacketsNorm.reset();	outPacketsPri.reset();	seenIDs.clearAll();	status = S_NONE;	type = T_NONE;}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
{

	if  (	   (type == t) 
			&& (isConnected())
			&& (!cid.isSet() || chanID.isSame(cid))
			&& (!did.isSet() || remoteID.isSame(did))
			&& (!sid.isSet() || !sid.isSame(remoteID))
		)
			return pcpStream.outData.writePacket(pack);
	return false;
}


// -----------------------------------
bool Servent::acceptGIV(ClientSocket *givSock)
{
	if (!pushSock)
	{
		pushSock = givSock;
		return true;
	}else
		return false;
}
// -----------------------------------Host Servent::getHost(){	Host h(0,0);	if (sock)		h = sock->host;	return h;}// -----------------------------------bool Servent::outputPacket(GnuPacket &p, bool pri){	lock.on();	bool r=false;	if (pri)		r = outPacketsPri.write(p);	else	{		if (servMgr->useFlowControl)		{			int per = outPacketsNorm.percentFull();			if (per > 50)				flowControl = true;			else if (per < 25)				flowControl = false;		}		bool send=true;		if (flowControl)		{			// if in flowcontrol, only allow packets with less of a hop count than already in queue			if (p.hops >= outPacketsNorm.findMinHop())				send = false;		}		if (send)			r = outPacketsNorm.write(p);	}	lock.off();	return r;}// -----------------------------------bool Servent::initServer(Host &h){	try	{		checkFree();		status = S_WAIT;		createSocket();		sock->bind(h);		thread.data = this;		thread.func = serverProc;		type = T_SERVER;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Bad server: %s",e.msg);		kill();		return false;	}	return true;}// -----------------------------------void Servent::checkFree(){	if (sock)		throw StreamException("Socket already set");	if (thread.active)		throw StreamException("Thread already active");}// -----------------------------------void Servent::initIncoming(ClientSocket *s, unsigned int a){	try{		checkFree();		type = T_INCOMING;		sock = s;		allow = a;		thread.data = this;		thread.func = incomingProc;		setStatus(S_PROTOCOL);

		char ipStr[64];
		sock->host.toStr(ipStr);
		LOG_DEBUG("Incoming from %s",ipStr);
		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Incoming error: %s",e.msg);		kill();	}}// -----------------------------------void Servent::initOutgoing(TYPE ty){	try 	{		checkFree();		type = ty;		thread.data = this;		thread.func = outgoingProc;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("Unable to start outgoing: %s",e.msg);		kill();	}}
// -----------------------------------
void Servent::initPCP(Host &rh)
{
	char ipStr[64];
	rh.toStr(ipStr);
	try 
	{
		checkFree();

	    createSocket();

		type = T_COUT;

		sock->open(rh);

		if (!isAllowed(ALLOW_SERVENT))
			throw StreamException("Servent not allowed");

		thread.data = this;
		thread.func = outgoingProc;

		LOG_DEBUG("Outgoing to %s",ipStr);

		if (!sys->startThread(&thread))
			throw StreamException("Can`t start thread");

	}catch(StreamException &e)
	{
		LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
		kill();
	}
}

#if 0
// -----------------------------------
void	Servent::initChannelFetch(Host &host)
{
	type = T_STREAM;

	char ipStr[64];
	host.toStr(ipStr);

	checkFree();
	 
	createSocket();
		
	sock->open(host);

		
	if (!isAllowed(ALLOW_DATA))	
		throw StreamException("Servent not allowed");
		
	sock->connect();
}
#endif

// -----------------------------------void Servent::initGIV(Host &h, GnuID &id){	char ipStr[64];	h.toStr(ipStr);	try 	{		checkFree();		givID = id;	    createSocket();		sock->open(h);		if (!isAllowed(ALLOW_DATA))			throw StreamException("Servent not allowed");				sock->connect();		thread.data = this;		thread.func = givProc;		type = T_STREAM;		if (!sys->startThread(&thread))			throw StreamException("Can`t start thread");	}catch(StreamException &e)	{		LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);		kill();	}}// -----------------------------------void Servent::createSocket(){	if (sock)		LOG_ERROR("Servent::createSocket attempt made while active");	sock = sys->createSocket();}// -----------------------------------void Servent::close(){		thread.active = false;		
	setStatus(S_CLOSING);	if (sock)	{
		sock->close();		delete sock;		sock = NULL;	}

	if (pushSock)
	{
		pushSock->close();
		delete pushSock;
		pushSock = NULL;
	}
}// -----------------------------------void Servent::setStatus(STATUS s){	if (s != status)	{		status = s;		if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))			lastConnect = sys->getTime();	}}
#if 0
// -----------------------------------
ChannelStream *Servent::handshakeFetch(ChanHit &chanHit, unsigned spos, GnuID &cid)
{
	
	chanID = cid;

	streamPos = spos;

	setStatus(S_HANDSHAKE);
	char idStr[64];
	cid.toStr(idStr);

	char sidStr[64];
	servMgr->sessionID.toStr(sidStr);


	sock->writeLine("GET /channel/%s HTTP/1.0",idStr);
	sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);
	sock->writeLine("%s %d",PCX_HS_PCP,1);
	sock->writeLine("%s %s",PCX_HS_SESSIONID,sidStr);

	if (servMgr->canConnectToMe(chanHit.host))
		sock->writeLine("%s %d",PCX_HS_PORT,servMgr->serverHost.port);

	// ask to check me for incoming connections
	if ((chanHit.tracker) && (servMgr->getFirewall() == ServMgr::FW_UNKNOWN))
		sock->writeLine("%s %d",PCX_HS_PINGME,servMgr->serverHost.port);

	// ask for last known packet pos
	sock->writeLine("%s %d",PCX_HS_POS,streamPos);

	sock->writeLine("");


	HTTP http(*sock);

	int r = http.readResponse();

	ChanInfo info;
	while (http.nextHeader())
	{
		Servent::readICYHeader(http, info, NULL,chanHit.tracker);
		LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
	}

	streamPos = info.streamPos;

	setStatus(S_CONNECTED);

	pcpStream = new PCPStream(PCP_BCST_UP,PCP_BCST_DOWN);

	return pcpStream;
}
#endif

// -----------------------------------void Servent::handshakeOut(){    sock->writeLine(GNU_PEERCONN);	char str[64];    	sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);    sock->writeLine("%s %d",PCX_HS_PCP,1);
	if (priorityConnect)	    sock->writeLine("%s %d",PCX_HS_PRIORITY,1);		if (networkID.isSet())	{		networkID.toStr(str);		sock->writeLine("%s %s",PCX_HS_NETWORKID,str);	}	servMgr->sessionID.toStr(str);	sock->writeLine("%s %s",PCX_HS_ID,str);	    sock->writeLine("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());		sock->writeLine("");	HTTP http(*sock);
	int r = http.readResponse();

	if (r != 200)
	{
		LOG_ERROR("Expected 200, got %d",r);
		throw StreamException("Unexpected HTTP response");
	}
	bool versionValid = false;	GnuID clientID;	clientID.clear();    while (http.nextHeader())    {		LOG_DEBUG(http.cmdLine);		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);		}else if (http.isHeader(PCX_HS_NETWORKID))			clientID.fromStr(arg);    }
	if (!clientID.isSame(networkID))		throw HTTPException(HTTP_SC_UNAVAILABLE,503);	if (!versionValid)		throw HTTPException(HTTP_SC_UNAUTHORIZED,401);    sock->writeLine(GNU_OK);    sock->writeLine("");

}

// -----------------------------------
void Servent::processOutChannel()
{
}


// -----------------------------------void Servent::handshakeIn(){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -