⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlif.cpp.bak

📁 1)安装sp2补丁. 2)安装中文语言包. 3)关闭除系统盘除外的系统还原 4)控制面板->区域和语言选项->区域选项 选择中国,位置:选择中国.高级选项卡 非unicode程序的语
💻 BAK
字号:

#include "minifc.h"
#include "hammerif.h"
#include "dlif.h"
#include "smcic.h"
#include "config.h"

extern DLConfig *g_config;
extern CMonitorServer *g_mon;

struct srpc_if dlif[]=
{

	//{SRPC_OPEN,			"",					(void*)downif_open,0 },
	//{SRPC_CLOSE,		"",					(void*)downif_close,0 },
	{"downif_adddown",		"issiiiss",		(void*)dlif_add_downque, NULL}

};

//typedef struct downif_clnt
//{
//	CString addr;
//	UINT port;
//} DOWNIF_CLNT;


//int __stdcall downif_open(JThread *th,StdSocket *clnt,superint_ut *ud)
//{
//	DOWNIF_CLNT *pclnt=new DOWNIF_CLNT;
//
//	clnt->GetPeerName(pclnt->addr, pclnt->port);
//	*ud=(superint_ut)pclnt;
//
//	return PROC_CALL_OK;
//}
//
//int __stdcall downif_close(JThread *th,StdSocket *clnt,superint_ut *ud)
//{
//	DOWNIF_CLNT	*pclnt=(DOWNIF_CLNT *)(*ud);
//	delete pclnt;
//
//	return PROC_CALL_OK;
//}
//
//
int __stdcall dlif_add_downque(SRPC_CONTEXT *context,superint_ut ud,NetDataTuple *arg,NetDataTuple *result,CString &strError)
{
	int uid;
	CString mfid,pipeid,pipename,gtype;
	int startpos,endpos,size;
	char gettype;

	superint_ut rowid;
  
	uid=arg->GetItemLong(0);
	mfid=arg->GetItemString(1);
	pipeid=arg->GetItemString(2);
	startpos=arg->GetItemLong(3);
	endpos=arg->GetItemLong(4);
	size=arg->GetItemLong(5);
	gtype=arg->GetItemString(6);
	pipename=arg->GetItemString(7);

	if(gtype.GetLength())
		gettype=gtype.GetAt(0);
	else
		gettype='g';

	ASSERT( g_config);

	CString clntip;
	UINT port;
	
	context->sock->GetPeerName(clntip,port);

	result->Clear();
	result->AddItems(1);
	if( g_config->downque_insert(uid,mfid,pipeid,pipename,startpos,endpos,gettype,size,clntip,rowid))
		result->SetItemLong(0,1);
	else
		result->SetItemLong(0,0);

	return PROC_CALL_OK;
}

BOOL isNeedDownload(BaseNetClient *hammer, LPCSTR pipename)
{
	//判断hammer中stdout pipe的数据进度来决定是否填充数据

	return TRUE;
}


CFile	*fp;


BOOL CheckBuffer(int curpos,CString& buf)
{
	return TRUE;

	int i;
	for(i=0;i<buf.GetLength();i++)
	{
		if( buf.GetAt(i)!=(curpos+i+1)%106)
			return FALSE;
	}

	BYTE *bytes=new BYTE[buf.GetLength()];

	fp->Seek(curpos,CFile::begin);
	fp->Read(bytes,buf.GetLength());

	for(i=0;i<buf.GetLength();i++)
	{
		if( buf.GetAt(i)!=bytes[i])
		{
			delete bytes;
			return FALSE;
		}
	}

	delete bytes;
	return TRUE;
}


CPtrList *g_downth_list;
CPtrList *g_downth_free_list;
CSect	  g_down_cs;



//下载管理线程
THREAD_RETTYPE WINAPI th_dlmgr( JThread *th_mgr)
{
	int c,thc,maxc;
	JThread *th;
	POSITION pos;

	//fp=new CStdioFile;
	//fp->Open("f:/106.dat",CFile::shareDenyWrite);
	

	DLConfig *config=g_config;
	ASSERT( config);

	g_downth_list=new CPtrList;
	g_downth_free_list=new CPtrList;

	while(!th_mgr->IsShutdown())
	{
		 c=config->count_downque(STATUS_READY);

		 g_down_cs.Lock(DEBUGMTPARAM);
		 
		 thc =g_downth_list->GetCount();
		 maxc=config->m_maxthreads;

		while(!g_downth_free_list->IsEmpty())
		{
			th=(JThread*)g_downth_free_list->RemoveHead();
			th->WaitForSafeClose();
			delete th;
		}
		g_down_cs.Unlock();

		if(c>0 && c>thc*MINTASK_PERTHREAD && thc<maxc)
		{
			th=new JThread();

		    g_down_cs.Lock(DEBUGMTPARAM);
			g_downth_list->AddTail(th);
		    g_down_cs.Unlock();

			th->m_userdata =(superint_ut)config;
			th->Start((THREADFUN)th_download);

			g_mon->printf("create new download thread.\r\n");
		}

		JThread::Sleep(1000);
	}

	g_down_cs.Lock(DEBUGMTPARAM);

	pos=g_downth_list->GetHeadPosition();
	while(pos)
	{
		th=(JThread*)g_downth_list->GetNext(pos);
		th->Shutdown();
	}
	g_down_cs.Unlock();

	while(TRUE)
	{
		g_down_cs.Lock(DEBUGMTPARAM);
		c=g_downth_list->GetCount();
		g_down_cs.Unlock();

		if(c==0)
			break;
		JThread::Sleep(1000);
	}


	g_down_cs.Lock(DEBUGMTPARAM);
	pos=g_downth_free_list->GetHeadPosition();
	while(pos)
	{
		th=(JThread*)g_downth_free_list->GetNext(pos);
		th->WaitForSafeClose();
		delete th;
	}
	g_down_cs.Unlock();

	g_downth_free_list->RemoveAll();

	delete g_downth_list;
	delete g_downth_free_list;

	JTHREAD_EXIT( 0);
}


//下载线程
THREAD_RETTYPE WINAPI th_download( JThread *th)
{
	DLConfig *config=(DLConfig *)th->m_userdata;
	BaseNetClient *hammer,*dc;
	NetTable downs,nt,olddata,downrows,delrows;

	int outcount=0;
	int i,j,k;

	int uid;
	CString mfid,pipeid,pipename,clntip,remoteip;
	int startpos,endpos,curpos,size;
	char gettype;
	superint_ut rowid;
	CString buf;
	int retcode;
	BOOL bret;

	
	downs.Clear();
	delrows.Clear();
	delrows.AddCol(COLUMN_ROWID,NET_SUPERLONG);
	delrows.AddCol("downsrow",NET_LONG);

	while(!th->IsShutdown())
	{
		if(downs.RowCount()<MINTASK_PERTHREAD)
		{
			nt.Clear();
			config->query_downque(nt,MAXTASK_PERTHREAD-downs.RowCount());

			if(nt.RowCount())
			{
				nt.CopyDefTo(&olddata);
				if(downs.ColCount()==0)
					nt.CopyDefTo(&downs);

				olddata.AddRow();
				for(i=1;i<=nt.RowCount();i++)
				{
					nt.CopyRow(&olddata,1,i);
					if( config->take_downitem(olddata))
					{
						downs.AddRow(&olddata);
					}
				}
			}
		}

		if(th->IsShutdown())
			break;

		if(downs.RowCount()==0)
		{
			JThread::Sleep(1000);
			
			outcount++;

			if(outcount>30)
				break;
			else
				continue;
		}
		else
			outcount=0;

		//pooling download
		downrows.DeleteAllRow();
		delrows.DeleteAllRow();
		for(i=1;i<=downs.RowCount();i++)
		{
			clntip=downs.GetString(i,"clntip");
			pipename=downs.GetString(i,"pipename");

			//remoteip.Empty();
			hammer=config->take_hammer(clntip);
			if( call_hammerif_stdoutpipeisfinished(hammer,pipename,&retcode) && retcode==1)
			{
				j=delrows.AddRow();
				//downs.CopyRowSmart(&delrows,j,i);
				delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(i,COLUMN_ROWID));
				delrows.SetLong(j,"downsrow",i);
			}
			else
			if( isNeedDownload(hammer,pipename))
			{
				if(downrows.ColCount()==0)
				{
					downs.CopyDefTo(&downrows);
					downrows.AddCol("downsrow",NET_LONG);
				}
				j=downrows.AddRow();
				downs.CopyRowSmart(&downrows,j,i);
				downs.SetLong(j,"downsrow",i);
			}

			config->free_hammer(hammer);
		}

		//connect dc,start download
		if(downrows.RowCount()==0)
		{
			if(delrows.RowCount())
			{
				//config->downque_remove(delrows);
				delrows.QuickSort( delrows.GetColIndex("downsrow"));
				for(i=delrows.RowCount();i>0;i--)
					downs.DeleteRow( delrows.GetLong(i,"downsrow"));
			}

			JThread::Sleep(200);
			continue;
		}


		dc=config->take_dc();
		for(i=1;i<=downrows.RowCount();i++)
		{
			uid=downrows.GetLong(i,"uid");
			mfid=downrows.GetString(i,"mfid");
			pipeid=downrows.GetString(i,"pipeid");

			startpos=downrows.GetLong(i,"startpos");
			endpos=downrows.GetLong(i,"endpos");
			curpos=downrows.GetLong(i,"curpos");

			size=downrows.GetLong(i,"size");
			gettype=downrows.GetChar(i,"gettype");
			gettype|=0x20;

			pipename=downrows.GetString(i,"pipename");
			clntip=downs.GetString(i,"clntip");

			rowid=downrows.GetSuperLong(i,COLUMN_ROWID);
			
			buf.Empty();
			if(size)
			{
				bret=call_bodypipe_read(dc,uid,mfid,pipeid,min(WRITEBLOB_SIZE,endpos+1-curpos),&buf);

				ASSERT(CheckBuffer(curpos,buf));

				//char ch;
				//if(buf.GetLength())
				//	ch=buf[0];
				//else
				//	ch=255;
				
				//remoteip.Format("downloading start:%d, current:%d, end:%d\r\n"
				//	"checkbuffer start:%d, current:%d, pipename: %s -- 0x%02X\r\n",
				//	startpos,curpos,endpos,startpos,curpos,(LPCSTR)pipename,ch);
				//wlog("c:/tmpdlsrv.log",(LPCSTR)remoteip);
				//g_mon->printf(remoteip);
			}
			else
				bret=call_bodypipe_read(dc,uid,mfid,pipeid,WRITEBLOB_SIZE,&buf);

			if(!bret)
			{
				g_mon->printf("*******************************bodypipe_read failure!\r\n");
				//读取失败
				call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);

				hammer=config->take_hammer(clntip);
				call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
				config->free_hammer(hammer);
				
				k=downs.Find("pipeid",pipeid);
				ASSERT(k==downrows.GetLong(i,"downsrow"));

				j=delrows.AddRow();
				delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
				delrows.SetLong(j,"downsrow",k);

				//config->downque_remove(pipeid);
				continue;
			}

			if(buf.GetLength()==0)
			{
				//下载完毕
				g_mon->printf("*******************************bodypipe_read end 1!\r\n");

				call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);

				hammer=config->take_hammer(clntip);
				call_hammerif_stdoutpipewriteend(hammer,pipename,&retcode);
				config->free_hammer(hammer);
				
				k=downs.Find("pipeid",pipeid);
				ASSERT(k==downrows.GetLong(i,"downsrow"));

				j=delrows.AddRow();
				delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
				delrows.SetLong(j,"downsrow",k);

				//config->downque_remove(pipeid);
				continue;
			}

			if(gettype=='f')
			{
				CString bbuf;

				retcode=-1;
				hammer=config->take_hammer(clntip);
				if(decode_base64str(buf,&bbuf))
					call_hammerif_stdoutpipewrite(hammer,pipename,bbuf,&retcode);
				else
					call_hammerif_stdoutpipewrite(hammer,pipename,buf,&retcode);
				config->free_hammer(hammer);

				if(retcode==0)
				{
					g_mon->printf("writeclient failure %s, start:%d,current:%d,end:%d\r\n",(LPCSTR)mfid,startpos,curpos,endpos);
					call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);

					hammer=config->take_hammer(clntip);
					call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
					config->free_hammer(hammer);
					
					k=downs.Find("pipeid",pipeid);
					ASSERT(k==downrows.GetLong(i,"downsrow"));

					j=delrows.AddRow();
					delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
					delrows.SetLong(j,"downsrow",k);

					//config->downque_remove(pipeid);
					continue;
				}
			}
			else
			{
				retcode=-1;
				hammer=config->take_hammer(clntip);
				call_hammerif_stdoutpipewrite(hammer,pipename,buf,&retcode);
				config->free_hammer(hammer);

				if(retcode==0)
				{
					g_mon->printf("writeclient failure %s, start:%d,current:%d,end:%d\r\n",(LPCSTR)mfid,startpos,curpos,endpos);
					call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);

					hammer=config->take_hammer(clntip);
					call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
					config->free_hammer(hammer);
					
					k=downs.Find("pipeid",pipeid);
					ASSERT(k==downrows.GetLong(i,"downsrow"));

					j=delrows.AddRow();
					delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
					delrows.SetLong(j,"downsrow",k);

					//config->downque_remove(pipeid);
					continue;
				}
			}

			curpos+=buf.GetLength();
			if(size && curpos>=endpos+1)
			{
				g_mon->printf("*******************************bodypipe_read end 2!\r\n");

				call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);

				hammer=config->take_hammer(clntip);
				call_hammerif_stdoutpipewriteend(hammer,pipename,&retcode);
				config->free_hammer(hammer);

				if(gettype=='n' && size && curpos>=size)
					call_netfile_updowncount(dc,uid,mfid,&retcode);
				
				k=downs.Find("pipeid",pipeid);
				ASSERT(k==downrows.GetLong(i,"downsrow"));

				j=delrows.AddRow();
				delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
				delrows.SetLong(j,"downsrow",k);

				//config->downque_remove(pipeid);
			}
			else
			{
				j=downs.Find("pipeid",pipeid);
				ASSERT(j==downrows.GetLong(i,"downsrow"));

				downs.SetLong(j,"curpos",curpos);
				downs.SetDatetime(j,"dtime",nowtime());

				//ASSERT(downs.GetSuperLong(j,"$rowid")==rowid)
				//if(j!=i)
				//{
				//downs.print_rows();
				//}
				//ASSERT(j==i)

				config->downque_updatepos(rowid,curpos);
			}
		}
		config->free_dc(dc);

		if(delrows.RowCount())
		{
			//config->downque_remove(delrows);
			delrows.QuickSort( delrows.GetColIndex("downsrow"));
			for(i=delrows.RowCount();i>0;i--)
				downs.DeleteRow( delrows.GetLong(i,"downsrow"));
		}

		JThread::Sleep(50);
	}


	
	for(i=1;i<=downs.RowCount();i++)
	{
		rowid=downs.GetSuperLong(i,COLUMN_ROWID);
		config->downque_updatestatus(rowid,STATUS_READY);
	}

	POSITION pos;

	g_down_cs.Lock(DEBUGMTPARAM);

	pos=g_downth_list->Find(th);
	if(pos)
	{
		g_downth_list->RemoveAt(pos);
		g_downth_free_list->AddTail(th);
	}
	g_down_cs.Unlock();

	g_mon->printf("download thread exit.\r\n");

	JTHREAD_EXIT( 0);
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -