📄 opt_remotequeries.mx
字号:
@' The contents of this file are subject to the MonetDB Public License@' Version 1.1 (the "License"); you may not use this file except in@' compliance with the License. You may obtain a copy of the License at@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html@'@' Software distributed under the License is distributed on an "AS IS"@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the@' License for the specific language governing rights and limitations@' under the License.@'@' The Original Code is the MonetDB Database System.@'@' The Initial Developer of the Original Code is CWI.@' Portions created by CWI are Copyright (C) 1997-2007 CWI.@' All Rights Reserved.@f opt_remoteQueries@a M. Kersten@- Remote QueriesMAL variables may live at a different site from where they are used.In particular, the SQL front-end uses portions of remoteBATs as replication views. Each time such a view is needed,the corresponding BAT is fetched and added to the local cache.Consider the following snippet produced by a query compiler,@verbatimmid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal");b:bat[:oid,:int] := mserver.bind(mid,"rvar");c:=algebra.select(b,0,12);io.print(c);d:=algebra.select(b,5,10);low:= 5+1;e:=algebra.select(d,low,7);i:=aggr.count(e);io.printf(" count %d\n",i);io.print(d);@end verbatimwhich uses a BAT @sc{rvar} stored at the remote site @sc{db1}.There are several options to execute this query.The remote BAT can be fetched as soon as the bind operation is executed,or a portion can be fetched after a remote select,or the output for the user could be retrieved.An optimal solution depends on the actual resources available at both ends and the time to ship the BAT. The remote query optimizer assumes that the remote site has sufficientresources to handle the instructions.For each remote query it creates a private connection.It is re-used in subsequent calls .The remote environment is used to executethe statements. The objects are retrievedjust before they are locally needed.@verbatimmid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal");mserver.rpc(mid,"b:bat[:oid,:int] :=bbp.bind(\"rvar\");");mserver.rpc(mid,"c:=algebra.select(b,0,12);");c:bat[:oid,:int]:= mserver.rpc(mid, "io.print(c);");io.print(c);mserver.rpc(mid,"d:=algebra.select(b,5,10);");low:= 5+1;mserver.put(mid,"low",low);mserver.rpc(mid,"e:=algebra.select(d,low,7);");mserver.rpc(mid,"i:=aggr.count(d);");i:= mserver.rpc(mid,"io.print(i);");io.printf(" count %d\n",i);io.print(d);@end verbatimTo reduce the number of interprocess communicationsthis code can be further improved by glueingthe instructions together when until the firstresult is needed. @{Glueing together statements should respect the flow.It seems that we can savely concatenate subsequencerequests.@verbatim mid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal"); mserver.rpc(mid,"b:bat[:oid,:int] :=bbp.bind(\"rvar\");"); mserver.rpc(mid,"c:=algebra.select(b,0,12);"); c:bat[:oid,:int]:= mserver.rpc(mid, "io.print(c);"); io.print(c); mserver.rpc(mid,"d:=algebra.select(b,5,10);"); low:= 5+1; mserver.put(mid,"low",low); i:= mserver.rpc(mid,"e:=algebra.select(d,low,7); i:=aggr.count(d); io.print(i);"); io.printf(" count %d\n",i); io.print(d);@end verbatim@malpattern optimizer.remoteQueries():straddress OPTremoteQueries;pattern optimizer.remoteQueries(mod:str, fcn:str):straddress OPTremoteQueriescomment "Resolve the multi-table definitions";@h#ifndef _OPT_REMOTE_#define _OPT_REMOTE_#include "opt_prelude.h"#include "opt_support.h"/* #define DEBUG_OPT_REMOTE show partial result */#endif@c#include "mal_config.h"#include "opt_remoteQueries.h"#include "mal_interpreter.h" /* for showErrors() */#include "mal_builder.h"@-The instruction sent is produced with a variation of call2strfrom the debugger.@cstatic strRQcall2str(MalBlkPtr mb, InstrPtr p){ int k,len=1; str msg; str s,cv= NULL; msg = (str) GDKmalloc(BUFSIZ); msg[0]='#'; msg[1]=0; if( p->barrier) strcat(msg, operatorName(p->barrier)); if( p->retc > 1) strcat(msg,"("); len= strlen(msg); for (k = 0; k < p->retc; k++) { VarPtr v = getVar(mb, getArg(p, k)); if( v->isudftype){ str tpe = getTypeName(getVarType(mb, getArg(p, k))); sprintf(msg+len, "%s:%s ", v->name, tpe); GDKfree(tpe); } else if (isTmpVar(mb, getArg(p,k))) sprintf(msg+len, "%c%d", REFMARKER, v->tmpindex); else sprintf(msg+len, "%s", v->name); if (k < p->retc - 1) strcat(msg,","); len= strlen(msg); } if( p->retc > 1) strcat(msg,")"); sprintf(msg+len,":= %s.%s(",getModuleId(p),getFunctionId(p)); s = strchr(msg, '('); if (s) { s++; *s = 0; len = strlen(msg); for (k = p->retc; k < p->argc; k++) { VarPtr v = getVar(mb, getArg(p, k)); if( isConstant(mb, getArg(p,k)) ){ if( v->type == TYPE_void) { sprintf(msg+len, "nil"); } else { VALformat(&cv, &v->value); sprintf(msg+len,"%s:%s",cv, ATOMname(v->type)); GDKfree(cv); } } else if (isTmpVar(mb, getArg(p,k))) sprintf(msg+len, "%c%d", REFMARKER, v->tmpindex); else sprintf(msg+len, "%s", v->name); if (k < p->argc - 1) strcat(msg,","); len= strlen(msg); } strcat(msg,");"); } /* printf("#RQcall:%s\n",msg);*/ return msg;}@- The algorithm follows the common scheme used so far.Instructions are taken out one-by-one and copiedto the new block.A local cache of connections is established, becausethe statements related to a single remote databaseshould be executed in the same stack context.A pitfall is to create multiple connections withtheir isolated runtime environment.@= lookupServer /* lookup the server connection */ if( location[getArg(p,0)] == 0){ db = 0; if( isConstant(mb,getArg(p,@1)) ) db= getVarConstant(mb, getArg(p,@1)).val.sval; for(k=0; k<dbtop; k++) if( strcmp(db, dbalias[k].dbname)== 0) break; if( k== dbtop){ r= newInstruction(mb,ASSIGNsymbol); getModuleId(r)= mserverRef; getFunctionId(r)= lookupRef; j= getArg(r,0)= newTmpVariable(mb, TYPE_int); pushArgument(mb,r, getArg(p,@1)); pushInstruction(mb,r); dbalias[dbtop].dbhdl= j; dbalias[dbtop++].dbname= db; if( dbtop== 127) dbtop--; } else j= dbalias[k].dbhdl; location[getArg(p,0)]= j; } else j= location[getArg(p,0)];@= prepareRemote r= newInstruction(mb,ASSIGNsymbol); getModuleId(r)= mserverRef; getFunctionId(r)= rpcRef; getArg(r,0)= newTmpVariable(mb, @1); pushArgument(mb,r,j);@= putRemoteVariables for(j=p->retc; j<p->argc; j++) if( location[getArg(p,j)] == 0 && !isConstant(mb,getArg(p,j)) ){ q= newStmt(mb,mserverRef,putRef); getArg(q,0)= newTmpVariable(mb, TYPE_void); pushArgument(mb,q,location[getArg(p,j)]); pushStr(mb,q, getRefName(mb,getArg(p,j))); pushArgument(mb,q,getArg(p,j)); }@= remoteAction s= RQcall2str(mb,p); pushStr(mb,r,s+1); GDKfree(s); pushInstruction(mb,r); freeInstruction(p); doit++;@ctypedef struct{ str dbname; int dbhdl;} DBalias;static intOPTremoteQueriesImplementation(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){ InstrPtr p, q, r, *old; int i, j, cnt, limit, doit=0; int remoteSite,collectFirst; int *location; DBalias *dbalias; int dbtop,k; char buf[BUFSIZ],*s, *db; ValRecord cst; cst.vtype= TYPE_int; cst.val.ival= 0;#ifdef DEBUG_OPT_REMOTE stream_printf(GDKout, "RemoteQueries optimizer started\n");#endif (void) stk; (void) pci; setLifespan(mb); limit = mb->stop; old = mb->stmt; location= alloca(mb->vsize * sizeof(int)); memset((char*) location, 0, mb->vsize * sizeof(int)); dbalias= (DBalias*) alloca(128 * sizeof(DBalias)); memset((char*) dbalias, 0, 128 * sizeof(DBalias)); dbtop= 0; newMalBlkStmt(mb, mb->stop); for (i = 0; i < limit; i++) { p = old[i]; /* detect remote instructions */ cnt=0; for(j=0; j<p->argc; j++) if (location[getArg(p,j)]) cnt++; /* detect remote variable binding */ if( (getModuleId(p)== mserverRef && getFunctionId(p)==bindRef)){ if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) { int tpe; freezeVarType(mb,getArg(p,0)); j = getArg(p,1); /* lookupServer with key */ tpe = getArgType(mb,p,0); /* result is remote */ location[getArg(p,0)]= j; /* turn the instruction into a local one */ /* one argument less */ p->argc--; /* only use the second argument (string) */ getArg(p,1)= getArg(p,2); getModuleId(p) = bbpRef; @:prepareRemote(tpe)@ @:putRemoteVariables()@ @:remoteAction()@ } else pushInstruction(mb,p); } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){ if( p->argc == 3){ /* a remote sql eval is needed */ @:lookupServer(1)@ /* turn the instruction into a local one */ /* one argument less */ p->argc--; /* only use the second argument (string) */ getArg(p,1)= getArg(p,2); @:prepareRemote(TYPE_void)@ s= RQcall2str(mb,p); pushStr(mb,r,s+1); GDKfree(s); pushInstruction(mb,r); freeInstruction(p); doit++; } } else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){ if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) { int tpe; freezeVarType(mb,getArg(p,0)); j = getArg(p,1); /* lookupServer with key */ tpe = getArgType(mb,p,0); @:lookupServer(4)@ /* turn the instruction into a local one */ getArg(p,4)= defConstant(mb, TYPE_int, &cst); @:prepareRemote(tpe)@ @:putRemoteVariables()@ @:remoteAction()@ } else pushInstruction(mb,p); } else if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) { if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) { @:lookupServer(3)@ /* turn the instruction into a local one */ getArg(p,3)= defConstant(mb, TYPE_int, &cst); @:prepareRemote(TYPE_void)@ @:putRemoteVariables()@ @:remoteAction()@ } else pushInstruction(mb,p);#ifdef DEBUG_OPT_REMOTE printf("found remote variable %s ad %d\n", getVarName(mb,getArg(p,0)), location[getArg(p,0)]);#endif } else if( getModuleId(p) && strcmp(getModuleId(p),"optimizer")==0 && getFunctionId(p) && strcmp(getFunctionId(p),"remoteQueries")==0 ) freeInstruction(p); else if (cnt == 0 || p->barrier) /* local only or flow control statement */ pushInstruction(mb,p); else {@-The hard part is to decide what to do with instructions thatcontain a reference to a remote variable.In the first implementation we use the following policy.If there are multiple sites involved, all arguments aremoved local for processing. Moreover, all local argumentsto be shipped should be simple.@c remoteSite=0; collectFirst= FALSE; for(j=0; j<p->argc; j++) if( location[getArg(p,j)]){ if (remoteSite == 0) remoteSite= location[getArg(p,j)]; else if( remoteSite != location[getArg(p,j)]) collectFirst= TRUE; } if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef && (getFunctionId(p)== resultSetRef || getFunctionId(p)== rsColumnRef))) collectFirst= TRUE; /* local BATs are not shipped */ if( remoteSite && collectFirst== FALSE) for(j=p->retc; j<p->argc; j++) if( location[getArg(p,j)] == 0 && isaBatType(getVarType(mb,getArg(p,j)))) collectFirst= TRUE; if (collectFirst){ /* perform locally */ for(j=p->retc; j<p->argc; j++) if( location[getArg(p,j)]){ q= newStmt(mb,mserverRef,rpcRef); getArg(q,0)= getArg(p,j); pushArgument(mb,q,location[getArg(p,j)]); snprintf(buf,BUFSIZ,"io.print(%s);", getRefName(mb,getArg(p,j)) ); pushStr(mb,q,buf); } pushInstruction(mb,p); /* as of now all the targets are also local */ for(j=0; j<p->retc; j++) location[getArg(p,j)]= 0; doit++; } else if (remoteSite){ /* single remote site involved */ r= newInstruction(mb,ASSIGNsymbol); getModuleId(r)= mserverRef; getFunctionId(r)= rpcRef; getArg(r,0)= newTmpVariable(mb, TYPE_void); pushArgument(mb, r, remoteSite); for(j=p->retc; j<p->argc; j++) if( location[getArg(p,j)] == 0 && !isConstant(mb,getArg(p,j)) ){ q= newStmt(mb,mserverRef,putRef); getArg(q,0)= newTmpVariable(mb, TYPE_void); pushArgument(mb, q, remoteSite); pushStr(mb,q, getRefName(mb,getArg(p,j))); pushArgument(mb, q, getArg(p,j)); } s= RQcall2str(mb, p); pushInstruction(mb,r); pushStr(mb,r,s+1); GDKfree(s); for(j=0; j<p->retc; j++) location[getArg(p,j)]= remoteSite; freeInstruction(p); doit++; } else pushInstruction(mb,p); } } GDKfree(old);#ifdef DEBUG_OPT_REMOTE if (doit) { stream_printf(GDKout, "remoteQueries %d\n", doit); printFunction(GDKout, mb, LIST_MAL_ALL); }#endif return doit;}@include optimizerWrapper.mx@h@:exportOptimizer(remoteQueries)@@c@:wrapOptimizer(remoteQueries,OPT_CHECK_ALL)@@}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -