⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 opt_remotequeries.mx

📁 一个内存数据库的源代码这是服务器端还有客户端
💻 MX
字号:
@' The contents of this file are subject to the MonetDB Public License@' Version 1.1 (the "License"); you may not use this file except in@' compliance with the License. You may obtain a copy of the License at@' http://monetdb.cwi.nl/Legal/MonetDBLicense-1.1.html@'@' Software distributed under the License is distributed on an "AS IS"@' basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the@' License for the specific language governing rights and limitations@' under the License.@'@' The Original Code is the MonetDB Database System.@'@' The Initial Developer of the Original Code is CWI.@' Portions created by CWI are Copyright (C) 1997-2007 CWI.@' All Rights Reserved.@f opt_remoteQueries@a M. Kersten@- Remote QueriesMAL variables may live at a different site from where they are used.In particular, the SQL front-end uses portions of remoteBATs as replication views. Each time such a view is needed,the corresponding BAT is fetched and added to the local cache.Consider the following snippet produced by a query compiler,@verbatimmid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal");b:bat[:oid,:int] := mserver.bind(mid,"rvar");c:=algebra.select(b,0,12);io.print(c);d:=algebra.select(b,5,10);low:= 5+1;e:=algebra.select(d,low,7);i:=aggr.count(e);io.printf(" count %d\n",i);io.print(d);@end verbatimwhich uses a BAT @sc{rvar} stored at the remote site @sc{db1}.There are several options to execute this query.The remote BAT can be fetched as soon as the bind operation is executed,or a portion can be fetched after a remote select,or the output for the user could be retrieved.An optimal solution depends on the actual resources available at both ends and the time to ship the BAT. The remote query optimizer assumes that the remote site has sufficientresources to handle the instructions.For each remote query it creates a private connection.It is re-used in subsequent calls .The remote environment is used to executethe statements. The objects are retrievedjust before they are locally needed.@verbatimmid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal");mserver.rpc(mid,"b:bat[:oid,:int] :=bbp.bind(\"rvar\");");mserver.rpc(mid,"c:=algebra.select(b,0,12);");c:bat[:oid,:int]:= mserver.rpc(mid, "io.print(c);");io.print(c);mserver.rpc(mid,"d:=algebra.select(b,5,10);");low:= 5+1;mserver.put(mid,"low",low);mserver.rpc(mid,"e:=algebra.select(d,low,7);");mserver.rpc(mid,"i:=aggr.count(d);");i:= mserver.rpc(mid,"io.print(i);");io.printf(" count %d\n",i);io.print(d);@end verbatimTo reduce the number of interprocess communicationsthis code can be further improved by glueingthe instructions together when until the firstresult is needed. @{Glueing together statements should respect the flow.It seems that we can savely concatenate subsequencerequests.@verbatim    mid:= mserver.reconnect("s0_0","localhost",50000,"monetdb","monetdb","mal");    mserver.rpc(mid,"b:bat[:oid,:int] :=bbp.bind(\"rvar\");");    mserver.rpc(mid,"c:=algebra.select(b,0,12);");    c:bat[:oid,:int]:= mserver.rpc(mid, "io.print(c);");    io.print(c);    mserver.rpc(mid,"d:=algebra.select(b,5,10);");    low:= 5+1;    mserver.put(mid,"low",low);    i:= mserver.rpc(mid,"e:=algebra.select(d,low,7); i:=aggr.count(d); io.print(i);");    io.printf(" count %d\n",i);    io.print(d);@end verbatim@malpattern optimizer.remoteQueries():straddress OPTremoteQueries;pattern optimizer.remoteQueries(mod:str, fcn:str):straddress OPTremoteQueriescomment "Resolve the multi-table definitions";@h#ifndef _OPT_REMOTE_#define _OPT_REMOTE_#include "opt_prelude.h"#include "opt_support.h"/* #define DEBUG_OPT_REMOTE     show partial result */#endif@c#include "mal_config.h"#include "opt_remoteQueries.h"#include "mal_interpreter.h"	/* for showErrors() */#include "mal_builder.h"@-The instruction sent is produced with a variation of call2strfrom the debugger.@cstatic strRQcall2str(MalBlkPtr mb, InstrPtr p){	int k,len=1;	str msg;	str s,cv= NULL;	msg = (str) GDKmalloc(BUFSIZ);	msg[0]='#';	msg[1]=0;	if( p->barrier)		strcat(msg, operatorName(p->barrier));		if( p->retc > 1) strcat(msg,"(");	len= strlen(msg);	for (k = 0; k < p->retc; k++) {		VarPtr v = getVar(mb, getArg(p, k));		if( v->isudftype){			str tpe = getTypeName(getVarType(mb, getArg(p, k)));			sprintf(msg+len, "%s:%s ", v->name, tpe);			GDKfree(tpe);		} else		if (isTmpVar(mb, getArg(p,k)))			sprintf(msg+len, "%c%d", REFMARKER, v->tmpindex);		else			sprintf(msg+len, "%s", v->name);		if (k < p->retc - 1)			strcat(msg,",");		len= strlen(msg);	}	if( p->retc > 1) strcat(msg,")");	sprintf(msg+len,":= %s.%s(",getModuleId(p),getFunctionId(p));	s = strchr(msg, '(');	if (s) {		s++;		*s = 0;		len = strlen(msg);		for (k = p->retc; k < p->argc; k++) {			VarPtr v = getVar(mb, getArg(p, k));			if( isConstant(mb, getArg(p,k)) ){				if( v->type == TYPE_void) {					sprintf(msg+len, "nil");				} else {					VALformat(&cv, &v->value);					sprintf(msg+len,"%s:%s",cv, ATOMname(v->type));					GDKfree(cv);				}			} else if (isTmpVar(mb, getArg(p,k)))				sprintf(msg+len, "%c%d", REFMARKER, v->tmpindex);			else				sprintf(msg+len, "%s", v->name);			if (k < p->argc - 1)				strcat(msg,",");			len= strlen(msg);		}		strcat(msg,");");	} /* printf("#RQcall:%s\n",msg);*/	return msg;}@- The algorithm follows the common scheme used so far.Instructions are taken out one-by-one and copiedto the new block.A local cache of connections is established, becausethe statements related to a single remote databaseshould be executed in the same stack context.A pitfall is to create multiple connections withtheir isolated runtime environment.@= lookupServer	/* lookup the server connection */	if( location[getArg(p,0)] == 0){		db = 0;		if( isConstant(mb,getArg(p,@1)) )			db= getVarConstant(mb, getArg(p,@1)).val.sval;		for(k=0; k<dbtop; k++)			if( strcmp(db, dbalias[k].dbname)== 0)				break;				if( k== dbtop){			r= newInstruction(mb,ASSIGNsymbol);			getModuleId(r)= mserverRef;			getFunctionId(r)= lookupRef;			j= getArg(r,0)= newTmpVariable(mb, TYPE_int);			pushArgument(mb,r, getArg(p,@1));			pushInstruction(mb,r);			dbalias[dbtop].dbhdl= j;			dbalias[dbtop++].dbname= db;			if( dbtop== 127) dbtop--;		} else j= dbalias[k].dbhdl;		location[getArg(p,0)]= j;	} else j= location[getArg(p,0)];@= prepareRemote	r= newInstruction(mb,ASSIGNsymbol);	getModuleId(r)= mserverRef;	getFunctionId(r)= rpcRef;	getArg(r,0)= newTmpVariable(mb, @1);	pushArgument(mb,r,j);@= putRemoteVariables	for(j=p->retc; j<p->argc; j++)	if( location[getArg(p,j)] == 0 && !isConstant(mb,getArg(p,j)) ){		q= newStmt(mb,mserverRef,putRef);		getArg(q,0)= newTmpVariable(mb, TYPE_void);		pushArgument(mb,q,location[getArg(p,j)]);		pushStr(mb,q, getRefName(mb,getArg(p,j)));		pushArgument(mb,q,getArg(p,j));	}@= remoteAction	s= RQcall2str(mb,p);	pushStr(mb,r,s+1);	GDKfree(s);	pushInstruction(mb,r);	freeInstruction(p);	doit++;@ctypedef struct{	str dbname;	int dbhdl;} DBalias;static intOPTremoteQueriesImplementation(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){	InstrPtr p, q, r, *old;	int i, j, cnt, limit, doit=0;	int remoteSite,collectFirst;	int *location;	DBalias *dbalias;	int dbtop,k;	char buf[BUFSIZ],*s, *db;	ValRecord cst;	cst.vtype= TYPE_int;	cst.val.ival= 0;#ifdef DEBUG_OPT_REMOTE	stream_printf(GDKout, "RemoteQueries optimizer started\n");#endif	(void) stk;	(void) pci;	setLifespan(mb);	limit = mb->stop;	old = mb->stmt;	location= alloca(mb->vsize * sizeof(int));	memset((char*) location, 0, mb->vsize * sizeof(int));	dbalias= (DBalias*) alloca(128 * sizeof(DBalias));	memset((char*) dbalias, 0, 128 * sizeof(DBalias));	dbtop= 0;	newMalBlkStmt(mb, mb->stop);	for (i = 0; i < limit; i++) {		p = old[i];		/* detect remote instructions */		cnt=0;		for(j=0; j<p->argc; j++)			if (location[getArg(p,j)]) 				cnt++;		/* detect remote variable binding */				if( (getModuleId(p)== mserverRef && getFunctionId(p)==bindRef)){			if( p->argc == 3 && getArgType(mb,p,1) == TYPE_int ) {				int tpe;				freezeVarType(mb,getArg(p,0));				j = getArg(p,1); /* lookupServer with key */				tpe = getArgType(mb,p,0);				/* result is remote */				location[getArg(p,0)]= j;				/* turn the instruction into a local one */				/* one argument less */				p->argc--;				/* only use the second argument (string) */				getArg(p,1)= getArg(p,2);				getModuleId(p) = bbpRef;				@:prepareRemote(tpe)@				@:putRemoteVariables()@				@:remoteAction()@			} else				pushInstruction(mb,p);		} else if( (getModuleId(p)== sqlRef && getFunctionId(p)==evalRef) ){			if( p->argc == 3){				/* a remote sql eval is needed */				@:lookupServer(1)@				/* turn the instruction into a local one */				/* one argument less */				p->argc--;				/* only use the second argument (string) */				getArg(p,1)= getArg(p,2);				@:prepareRemote(TYPE_void)@				s= RQcall2str(mb,p);				pushStr(mb,r,s+1);				GDKfree(s);				pushInstruction(mb,r);				freeInstruction(p);				doit++;			}		} else if( (getModuleId(p)== sqlRef && getFunctionId(p)==bindRef) ){			if( p->argc == 6 && getArgType(mb,p,4) == TYPE_str ) {				int tpe;				freezeVarType(mb,getArg(p,0));				j = getArg(p,1); /* lookupServer with key */				tpe = getArgType(mb,p,0);				@:lookupServer(4)@				/* turn the instruction into a local one */				getArg(p,4)= defConstant(mb, TYPE_int, &cst);				@:prepareRemote(tpe)@				@:putRemoteVariables()@				@:remoteAction()@			} else				pushInstruction(mb,p);		} else		if(getModuleId(p)== sqlRef && getFunctionId(p)== binddbatRef) {			if( p->argc == 5 && getArgType(mb,p,3) == TYPE_str ) {				@:lookupServer(3)@				/* turn the instruction into a local one */				getArg(p,3)= defConstant(mb, TYPE_int, &cst);				@:prepareRemote(TYPE_void)@				@:putRemoteVariables()@				@:remoteAction()@			} else				pushInstruction(mb,p);#ifdef DEBUG_OPT_REMOTE			printf("found remote variable %s ad %d\n", 				getVarName(mb,getArg(p,0)), location[getArg(p,0)]);#endif		} else		if( getModuleId(p) && strcmp(getModuleId(p),"optimizer")==0 &&		    getFunctionId(p) && strcmp(getFunctionId(p),"remoteQueries")==0 )			freeInstruction(p);		else if (cnt == 0 || p->barrier) /* local only or flow control statement */			pushInstruction(mb,p);		else {@-The hard part is to decide what to do with instructions thatcontain a reference to a remote variable.In the first implementation we use the following policy.If there are multiple sites involved, all arguments aremoved local for processing. Moreover, all local argumentsto be shipped should be simple.@c			remoteSite=0;			collectFirst= FALSE;			for(j=0; j<p->argc; j++)			if( location[getArg(p,j)]){				if (remoteSite == 0)					remoteSite= location[getArg(p,j)];				else if( remoteSite != location[getArg(p,j)])					collectFirst= TRUE;			}			if( getModuleId(p)== ioRef || (getModuleId(p)== sqlRef 		            && (getFunctionId(p)== resultSetRef ||				getFunctionId(p)== rsColumnRef)))				 collectFirst= TRUE; 			/* local BATs are not shipped */			if( remoteSite && collectFirst== FALSE)				for(j=p->retc; j<p->argc; j++)				if( location[getArg(p,j)] == 0 &&					isaBatType(getVarType(mb,getArg(p,j)))) 						collectFirst= TRUE;			if (collectFirst){				/* perform locally */				for(j=p->retc; j<p->argc; j++)				if( location[getArg(p,j)]){					q= newStmt(mb,mserverRef,rpcRef);					getArg(q,0)= getArg(p,j);					pushArgument(mb,q,location[getArg(p,j)]);					snprintf(buf,BUFSIZ,"io.print(%s);",						getRefName(mb,getArg(p,j)) );					pushStr(mb,q,buf);				}				pushInstruction(mb,p);				/* as of now all the targets are also local */				for(j=0; j<p->retc; j++)					location[getArg(p,j)]= 0;				doit++;			} else if (remoteSite){				/* single remote site involved */				r= newInstruction(mb,ASSIGNsymbol);				getModuleId(r)= mserverRef;				getFunctionId(r)= rpcRef;				getArg(r,0)= newTmpVariable(mb, TYPE_void);				pushArgument(mb, r, remoteSite);				for(j=p->retc; j<p->argc; j++)				if( location[getArg(p,j)] == 0 && !isConstant(mb,getArg(p,j)) ){					q= newStmt(mb,mserverRef,putRef);					getArg(q,0)= newTmpVariable(mb, TYPE_void);					pushArgument(mb, q, remoteSite);					pushStr(mb,q, getRefName(mb,getArg(p,j)));					pushArgument(mb, q, getArg(p,j));				}				s= RQcall2str(mb, p);				pushInstruction(mb,r);				pushStr(mb,r,s+1);				GDKfree(s);				for(j=0; j<p->retc; j++)					location[getArg(p,j)]= remoteSite;				freeInstruction(p);				doit++;			} else				pushInstruction(mb,p);		}	}	GDKfree(old);#ifdef DEBUG_OPT_REMOTE	if (doit) {		stream_printf(GDKout, "remoteQueries %d\n", doit);		printFunction(GDKout, mb, LIST_MAL_ALL);	}#endif	return doit;}@include optimizerWrapper.mx@h@:exportOptimizer(remoteQueries)@@c@:wrapOptimizer(remoteQueries,OPT_CHECK_ALL)@@}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -