📄 dlif.cpp.bak
字号:
#include "minifc.h"
#include "hammerif.h"
#include "dlif.h"
#include "smcic.h"
#include "config.h"
extern DLConfig *g_config;
extern CMonitorServer *g_mon;
struct srpc_if dlif[]=
{
//{SRPC_OPEN, "", (void*)downif_open,0 },
//{SRPC_CLOSE, "", (void*)downif_close,0 },
{"downif_adddown", "issiiiss", (void*)dlif_add_downque, NULL}
};
//typedef struct downif_clnt
//{
// CString addr;
// UINT port;
//} DOWNIF_CLNT;
//int __stdcall downif_open(JThread *th,StdSocket *clnt,superint_ut *ud)
//{
// DOWNIF_CLNT *pclnt=new DOWNIF_CLNT;
//
// clnt->GetPeerName(pclnt->addr, pclnt->port);
// *ud=(superint_ut)pclnt;
//
// return PROC_CALL_OK;
//}
//
//int __stdcall downif_close(JThread *th,StdSocket *clnt,superint_ut *ud)
//{
// DOWNIF_CLNT *pclnt=(DOWNIF_CLNT *)(*ud);
// delete pclnt;
//
// return PROC_CALL_OK;
//}
//
//
int __stdcall dlif_add_downque(SRPC_CONTEXT *context,superint_ut ud,NetDataTuple *arg,NetDataTuple *result,CString &strError)
{
int uid;
CString mfid,pipeid,pipename,gtype;
int startpos,endpos,size;
char gettype;
superint_ut rowid;
uid=arg->GetItemLong(0);
mfid=arg->GetItemString(1);
pipeid=arg->GetItemString(2);
startpos=arg->GetItemLong(3);
endpos=arg->GetItemLong(4);
size=arg->GetItemLong(5);
gtype=arg->GetItemString(6);
pipename=arg->GetItemString(7);
if(gtype.GetLength())
gettype=gtype.GetAt(0);
else
gettype='g';
ASSERT( g_config);
CString clntip;
UINT port;
context->sock->GetPeerName(clntip,port);
result->Clear();
result->AddItems(1);
if( g_config->downque_insert(uid,mfid,pipeid,pipename,startpos,endpos,gettype,size,clntip,rowid))
result->SetItemLong(0,1);
else
result->SetItemLong(0,0);
return PROC_CALL_OK;
}
BOOL isNeedDownload(BaseNetClient *hammer, LPCSTR pipename)
{
//判断hammer中stdout pipe的数据进度来决定是否填充数据
return TRUE;
}
CFile *fp;
BOOL CheckBuffer(int curpos,CString& buf)
{
return TRUE;
int i;
for(i=0;i<buf.GetLength();i++)
{
if( buf.GetAt(i)!=(curpos+i+1)%106)
return FALSE;
}
BYTE *bytes=new BYTE[buf.GetLength()];
fp->Seek(curpos,CFile::begin);
fp->Read(bytes,buf.GetLength());
for(i=0;i<buf.GetLength();i++)
{
if( buf.GetAt(i)!=bytes[i])
{
delete bytes;
return FALSE;
}
}
delete bytes;
return TRUE;
}
CPtrList *g_downth_list;
CPtrList *g_downth_free_list;
CSect g_down_cs;
//下载管理线程
THREAD_RETTYPE WINAPI th_dlmgr( JThread *th_mgr)
{
int c,thc,maxc;
JThread *th;
POSITION pos;
//fp=new CStdioFile;
//fp->Open("f:/106.dat",CFile::shareDenyWrite);
DLConfig *config=g_config;
ASSERT( config);
g_downth_list=new CPtrList;
g_downth_free_list=new CPtrList;
while(!th_mgr->IsShutdown())
{
c=config->count_downque(STATUS_READY);
g_down_cs.Lock(DEBUGMTPARAM);
thc =g_downth_list->GetCount();
maxc=config->m_maxthreads;
while(!g_downth_free_list->IsEmpty())
{
th=(JThread*)g_downth_free_list->RemoveHead();
th->WaitForSafeClose();
delete th;
}
g_down_cs.Unlock();
if(c>0 && c>thc*MINTASK_PERTHREAD && thc<maxc)
{
th=new JThread();
g_down_cs.Lock(DEBUGMTPARAM);
g_downth_list->AddTail(th);
g_down_cs.Unlock();
th->m_userdata =(superint_ut)config;
th->Start((THREADFUN)th_download);
g_mon->printf("create new download thread.\r\n");
}
JThread::Sleep(1000);
}
g_down_cs.Lock(DEBUGMTPARAM);
pos=g_downth_list->GetHeadPosition();
while(pos)
{
th=(JThread*)g_downth_list->GetNext(pos);
th->Shutdown();
}
g_down_cs.Unlock();
while(TRUE)
{
g_down_cs.Lock(DEBUGMTPARAM);
c=g_downth_list->GetCount();
g_down_cs.Unlock();
if(c==0)
break;
JThread::Sleep(1000);
}
g_down_cs.Lock(DEBUGMTPARAM);
pos=g_downth_free_list->GetHeadPosition();
while(pos)
{
th=(JThread*)g_downth_free_list->GetNext(pos);
th->WaitForSafeClose();
delete th;
}
g_down_cs.Unlock();
g_downth_free_list->RemoveAll();
delete g_downth_list;
delete g_downth_free_list;
JTHREAD_EXIT( 0);
}
//下载线程
THREAD_RETTYPE WINAPI th_download( JThread *th)
{
DLConfig *config=(DLConfig *)th->m_userdata;
BaseNetClient *hammer,*dc;
NetTable downs,nt,olddata,downrows,delrows;
int outcount=0;
int i,j,k;
int uid;
CString mfid,pipeid,pipename,clntip,remoteip;
int startpos,endpos,curpos,size;
char gettype;
superint_ut rowid;
CString buf;
int retcode;
BOOL bret;
downs.Clear();
delrows.Clear();
delrows.AddCol(COLUMN_ROWID,NET_SUPERLONG);
delrows.AddCol("downsrow",NET_LONG);
while(!th->IsShutdown())
{
if(downs.RowCount()<MINTASK_PERTHREAD)
{
nt.Clear();
config->query_downque(nt,MAXTASK_PERTHREAD-downs.RowCount());
if(nt.RowCount())
{
nt.CopyDefTo(&olddata);
if(downs.ColCount()==0)
nt.CopyDefTo(&downs);
olddata.AddRow();
for(i=1;i<=nt.RowCount();i++)
{
nt.CopyRow(&olddata,1,i);
if( config->take_downitem(olddata))
{
downs.AddRow(&olddata);
}
}
}
}
if(th->IsShutdown())
break;
if(downs.RowCount()==0)
{
JThread::Sleep(1000);
outcount++;
if(outcount>30)
break;
else
continue;
}
else
outcount=0;
//pooling download
downrows.DeleteAllRow();
delrows.DeleteAllRow();
for(i=1;i<=downs.RowCount();i++)
{
clntip=downs.GetString(i,"clntip");
pipename=downs.GetString(i,"pipename");
//remoteip.Empty();
hammer=config->take_hammer(clntip);
if( call_hammerif_stdoutpipeisfinished(hammer,pipename,&retcode) && retcode==1)
{
j=delrows.AddRow();
//downs.CopyRowSmart(&delrows,j,i);
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(i,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",i);
}
else
if( isNeedDownload(hammer,pipename))
{
if(downrows.ColCount()==0)
{
downs.CopyDefTo(&downrows);
downrows.AddCol("downsrow",NET_LONG);
}
j=downrows.AddRow();
downs.CopyRowSmart(&downrows,j,i);
downs.SetLong(j,"downsrow",i);
}
config->free_hammer(hammer);
}
//connect dc,start download
if(downrows.RowCount()==0)
{
if(delrows.RowCount())
{
//config->downque_remove(delrows);
delrows.QuickSort( delrows.GetColIndex("downsrow"));
for(i=delrows.RowCount();i>0;i--)
downs.DeleteRow( delrows.GetLong(i,"downsrow"));
}
JThread::Sleep(200);
continue;
}
dc=config->take_dc();
for(i=1;i<=downrows.RowCount();i++)
{
uid=downrows.GetLong(i,"uid");
mfid=downrows.GetString(i,"mfid");
pipeid=downrows.GetString(i,"pipeid");
startpos=downrows.GetLong(i,"startpos");
endpos=downrows.GetLong(i,"endpos");
curpos=downrows.GetLong(i,"curpos");
size=downrows.GetLong(i,"size");
gettype=downrows.GetChar(i,"gettype");
gettype|=0x20;
pipename=downrows.GetString(i,"pipename");
clntip=downs.GetString(i,"clntip");
rowid=downrows.GetSuperLong(i,COLUMN_ROWID);
buf.Empty();
if(size)
{
bret=call_bodypipe_read(dc,uid,mfid,pipeid,min(WRITEBLOB_SIZE,endpos+1-curpos),&buf);
ASSERT(CheckBuffer(curpos,buf));
//char ch;
//if(buf.GetLength())
// ch=buf[0];
//else
// ch=255;
//remoteip.Format("downloading start:%d, current:%d, end:%d\r\n"
// "checkbuffer start:%d, current:%d, pipename: %s -- 0x%02X\r\n",
// startpos,curpos,endpos,startpos,curpos,(LPCSTR)pipename,ch);
//wlog("c:/tmpdlsrv.log",(LPCSTR)remoteip);
//g_mon->printf(remoteip);
}
else
bret=call_bodypipe_read(dc,uid,mfid,pipeid,WRITEBLOB_SIZE,&buf);
if(!bret)
{
g_mon->printf("*******************************bodypipe_read failure!\r\n");
//读取失败
call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
config->free_hammer(hammer);
k=downs.Find("pipeid",pipeid);
ASSERT(k==downrows.GetLong(i,"downsrow"));
j=delrows.AddRow();
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",k);
//config->downque_remove(pipeid);
continue;
}
if(buf.GetLength()==0)
{
//下载完毕
g_mon->printf("*******************************bodypipe_read end 1!\r\n");
call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipewriteend(hammer,pipename,&retcode);
config->free_hammer(hammer);
k=downs.Find("pipeid",pipeid);
ASSERT(k==downrows.GetLong(i,"downsrow"));
j=delrows.AddRow();
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",k);
//config->downque_remove(pipeid);
continue;
}
if(gettype=='f')
{
CString bbuf;
retcode=-1;
hammer=config->take_hammer(clntip);
if(decode_base64str(buf,&bbuf))
call_hammerif_stdoutpipewrite(hammer,pipename,bbuf,&retcode);
else
call_hammerif_stdoutpipewrite(hammer,pipename,buf,&retcode);
config->free_hammer(hammer);
if(retcode==0)
{
g_mon->printf("writeclient failure %s, start:%d,current:%d,end:%d\r\n",(LPCSTR)mfid,startpos,curpos,endpos);
call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
config->free_hammer(hammer);
k=downs.Find("pipeid",pipeid);
ASSERT(k==downrows.GetLong(i,"downsrow"));
j=delrows.AddRow();
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",k);
//config->downque_remove(pipeid);
continue;
}
}
else
{
retcode=-1;
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipewrite(hammer,pipename,buf,&retcode);
config->free_hammer(hammer);
if(retcode==0)
{
g_mon->printf("writeclient failure %s, start:%d,current:%d,end:%d\r\n",(LPCSTR)mfid,startpos,curpos,endpos);
call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipeabort(hammer,pipename,&retcode);
config->free_hammer(hammer);
k=downs.Find("pipeid",pipeid);
ASSERT(k==downrows.GetLong(i,"downsrow"));
j=delrows.AddRow();
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",k);
//config->downque_remove(pipeid);
continue;
}
}
curpos+=buf.GetLength();
if(size && curpos>=endpos+1)
{
g_mon->printf("*******************************bodypipe_read end 2!\r\n");
call_bodypipe_abort(dc,uid,mfid,pipeid,&retcode);
hammer=config->take_hammer(clntip);
call_hammerif_stdoutpipewriteend(hammer,pipename,&retcode);
config->free_hammer(hammer);
if(gettype=='n' && size && curpos>=size)
call_netfile_updowncount(dc,uid,mfid,&retcode);
k=downs.Find("pipeid",pipeid);
ASSERT(k==downrows.GetLong(i,"downsrow"));
j=delrows.AddRow();
delrows.SetSuperLong(j,COLUMN_ROWID,downs.GetSuperLong(k,COLUMN_ROWID));
delrows.SetLong(j,"downsrow",k);
//config->downque_remove(pipeid);
}
else
{
j=downs.Find("pipeid",pipeid);
ASSERT(j==downrows.GetLong(i,"downsrow"));
downs.SetLong(j,"curpos",curpos);
downs.SetDatetime(j,"dtime",nowtime());
//ASSERT(downs.GetSuperLong(j,"$rowid")==rowid)
//if(j!=i)
//{
//downs.print_rows();
//}
//ASSERT(j==i)
config->downque_updatepos(rowid,curpos);
}
}
config->free_dc(dc);
if(delrows.RowCount())
{
//config->downque_remove(delrows);
delrows.QuickSort( delrows.GetColIndex("downsrow"));
for(i=delrows.RowCount();i>0;i--)
downs.DeleteRow( delrows.GetLong(i,"downsrow"));
}
JThread::Sleep(50);
}
for(i=1;i<=downs.RowCount();i++)
{
rowid=downs.GetSuperLong(i,COLUMN_ROWID);
config->downque_updatestatus(rowid,STATUS_READY);
}
POSITION pos;
g_down_cs.Lock(DEBUGMTPARAM);
pos=g_downth_list->Find(th);
if(pos)
{
g_downth_list->RemoveAt(pos);
g_downth_free_list->AddTail(th);
}
g_down_cs.Unlock();
g_mon->printf("download thread exit.\r\n");
JTHREAD_EXIT( 0);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -