mnemosyne_exec.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 672 行 · 第 1/2 页

ERL
672
字号
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%-module(mnemosyne_exec).-export([mk_patterns/1]).-export([setup/2]).-export([setup_collector_and_query/1, init_query/2,	 get_answers/3,	 kill_processes/1]).-export([start_simple_loop1/4, 	 start_simple_loop2/3,	 start_complicated_loop/3]). %% spawned%%-export([simple_loop1/4, simple_loop2/3, complicated_loop/3]). %% spawned%%-define(debug,42).-include("mnemosyne_debug.hrl").-include("mnemosyne_internal_form.hrl").%%%================================================================%%% 		Exports%%% eval_collect(Nsols, Query, Fun) where%%%	Fun = fun(Accu,P) is foldl'd over all bound patterns and the result is%%%			  returned%%% Ex1. fun(Accu,P) ->%%%          [P|Accu]%%%       end%%%     collects all solutions.%%% Ex2. fun(Accu,P) ->%%%          m:f(P),%%%          Accu%%%       end%%%     applies m:f on every solution but does not return any result-record(spec, {func,	       table = undefined,	       pattern	      }).%%%----------------------------------------------------------------%%% Cursor handling%%% %% Supposed to be called as:%%     H = setup_collector_and_query(Q),%%         ....%%     mnesia:transaction(%%         fun() ->%%	     Crsr = init_query(H [,Number_of_answers_to_prefetch]),%%	         ....%%	     loop over L1 = get_answers(Crsr [,Min,Max]),%%	         ....%%	   end),%%         ....%%     kill_processes(H)setup_collector_and_query(OptQuery) when is_record(OptQuery,optimizer_result) ->    Spec = #spec{func = fun(Accu,P) -> [P|Accu] end,		 pattern = OptQuery#optimizer_result.pattern},    CollectorPid =	case OptQuery#optimizer_result.how_to_eval of	    mnesia_match_build -> 		%% fetch all solutions using mnesia:match_object		[Code] = OptQuery#optimizer_result.code,		Bs = OptQuery#optimizer_result.common_bs,		?debugmsg(1, "Using mnesia:match_object(~w)\n",			  [Code#pred_sym.pattern]),		spawn_link(?MODULE, start_simple_loop1, [self(),Bs,Code,Spec]);	    query_eval when OptQuery#optimizer_result.code==[] ->		%% has the single answer as the pattern		Solutions = [OptQuery#optimizer_result.pattern],		spawn_link(?MODULE, start_simple_loop2, 			   [self(),Solutions,Spec]);	    query_eval -> %% Must start all processes		spawn_link(?MODULE, start_complicated_loop, 			   [self(),OptQuery,Spec])	end,    CursorOwner = self(),    {query_collector, CollectorPid, CursorOwner}.%%%----init_query({query_collector,CollectorPid,CursorOwner}, N) ->    Tid = mnesia:get_activity_id(),    legal_pid_tid(CollectorPid, CursorOwner, Tid),    CollectorPid ! {attention,CursorOwner},    CollectorPid ! {init_collector,CursorOwner,Tid,N},    {exec_cursor, CollectorPid, CursorOwner, Tid}.%%%----get_answers({exec_cursor,CollectorPid,CursorOwner,Tid}, Nmin, Nmax) ->    legal_pid_tid(CollectorPid, CursorOwner, Tid),    CollectorPid ! {get, CursorOwner, Tid, Nmin, Nmax},    receive	{answer,CollectorPid,Answers} ->	    Answers;	{fail,_,Cause} ->	    throw({'EXIT',Cause});	{mnesia_down, Node} ->	    mnesia:abort({node_not_running, Node})    end.    legal_pid_tid(CollectorPid, CursorOwner, Tid0) ->    Tid = mnesia:get_activity_id(),    CollectorProcessInfo = process_info(CollectorPid),    if 	CursorOwner =/= self() ->	    throw({'EXIT',{aborted,not_cursor_owner}});	Tid == undefined ->	    throw({'EXIT',{aborted,no_transaction}});	Tid =/= Tid0 ->	    throw({'EXIT',{aborted,wrong_transaction}});	CollectorProcessInfo == undefined ->	    throw({'EXIT',{aborted,no_collector}});	true ->	    ok    end.    %%%----kill_processes({query_collector,CollectorPid,CursorOwner}) ->    if 	CursorOwner =/= self() ->	    throw({'EXIT',{aborted,not_cursor_owner}});	true ->	    unlink(CollectorPid),	    exit(CollectorPid, deleted)    end.%%%----------------start_simple_loop1(Self,Bs,Code,Spec) ->    simple_loop1(Self,Bs,Code,Spec).start_simple_loop2(Self,Solutions,Spec) ->    simple_loop2(Self,Solutions,Spec).start_complicated_loop(Self,OptQuery,Spec) ->    complicated_loop(Self,OptQuery,Spec).simple_loop1(CursorOwner, Bs, Code, Spec) ->    ?debugmsg(1,	      "simple_loop1(CursorOwner=~w, Bs=~w, Code=~w, Spec=~w)\n",	      [CursorOwner, Bs, Code, Spec]	     ),    receive	{init_collector, CursorOwner, Tid, Nprefetch} ->	    case Spec#spec.table of		undefined -> true;		_ -> ets:delete (Spec#spec.table)	    end,	    Spec2 = Spec#spec{table=ets:new(filter2,[set,public])},	    mnesia:put_activity_id(Tid),	    case catch mnemosyne_op:get_from_table(Bs,Code,[]) of		{'EXIT', Cause} ->		    CursorOwner ! {fail, self(), Cause},		    simple_loop1(CursorOwner, Bs, Code, Spec);		Solutions ->		    simple_loop(filter_result(Spec2,Solutions),				Tid, CursorOwner, Spec2),		    simple_loop1(CursorOwner, Bs, Code, Spec2)	    end;	{activity_ended, Activity, ReplyTo} ->	    mnesia:put_activity_id(Activity),	    ReplyTo ! {activity_ended, Activity, self()},	    simple_loop1(CursorOwner, Bs, Code, Spec);	{attention,CursorOwner} ->	    simple_loop1(CursorOwner, Bs, Code, Spec)    end.    simple_loop2(CursorOwner, Solutions, Spec) ->    ?debugmsg(1,	      "simple_loop2(CursorOwner=~w, Solutions=~w, Spec=~w)\n",	      [CursorOwner, Solutions, Spec]	     ),    receive	{init_collector, CursorOwner, Tid, Nprefetch} ->	    case Spec#spec.table of		undefined -> true;		_ -> ets:delete (Spec#spec.table)	    end,	    Spec2 = Spec#spec{table=ets:new(filter3,[set,public])},	    mnesia:put_activity_id(Tid),	    simple_loop(Solutions, Tid, CursorOwner, Spec2),	    simple_loop2(CursorOwner, Solutions, Spec2);	{activity_ended, Activity, ReplyTo} ->	    mnesia:put_activity_id(Activity),	    ReplyTo ! {activity_ended, Activity, self()},	    simple_loop2(CursorOwner, Solutions, Spec);	{attention,CursorOwner} ->	    simple_loop2(CursorOwner, Solutions, Spec)    end.complicated_loop(CursorOwner, OptQuery, Spec) ->    ?debugmsg(1,	      "complicated_loop(CursorOwner=~w, OptQuery=~w, Spec=~w)\n",	      [CursorOwner, OptQuery, Spec]	     ),    %% process_flag(trap_exit, true),    {FirstPid,Count} = setup_query(OptQuery#optimizer_result.code, self()),    complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec).complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec) ->    receive	{init_collector, CursorOwner, Tid, Nprefetch} ->	    case Spec#spec.table of		undefined -> true;		_ -> ets:delete (Spec#spec.table)	    end,	    Spec2 = Spec#spec{table=ets:new(filter4,[set,public])},	    mnesia:put_activity_id(Tid),	    FirstPid ! {tid,Tid, self()},	    EndCounters = [mnemosyne_op:new_end_token(1)],	    FirstPid ! {bss, 			[OptQuery#optimizer_result.common_bs], 			Nprefetch, EndCounters, [], {true, self()}},	    Reply = no_pending([], Count,  FirstPid,			       Tid, CursorOwner, Spec2, EndCounters, 			       {[],[],[]}),	    CursorOwner ! Reply,	    complicated_loop_cont(FirstPid, Count,CursorOwner, 				  OptQuery, Spec2);	{activity_ended, Activity, ReplyTo} ->	    mnesia:put_activity_id(Activity),	    Message = {tid, Activity},	    FirstPid ! Message,	    flush_until(Message),	    ReplyTo ! {activity_ended, Activity, self()},	    complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec);		{attention,CursorOwner} ->	    complicated_loop_cont(FirstPid, Count, CursorOwner, OptQuery, Spec)    end.flush_until(Message) ->    receive	Message -> ok;	Other -> flush_until(Message)    end.simple_loop(Answers, Tid, CursorOwner, Spec) ->    ?debugmsg(1, "simple_loop Answers=~w\n", [Answers]),    receive	{get, CursorOwner, Tid, Nmin, Nmax} ->	    {First, Last} = mnemosyne_op:split_list(Nmax, Answers),	    CursorOwner ! {answer, self(), First},	    simple_loop(Last, Tid, CursorOwner, Spec);	{activity_ended, Activity, ReplyTo} ->	    mnesia:put_activity_id(Activity),	    ReplyTo ! {activity_ended, Activity, self()},	    ok;	{fail, SomePid, Cause} ->	    CursorOwner ! {fail, self(), Cause},	    ok;	{attention,CursorOwner} ->	    ok    end.	ask_for_more(Pid, N, EndCounters) when is_pid(Pid) ->    NewEndToken = mnemosyne_op:new_end_token(1),     Pid ! {bss, [], N, [NewEndToken], [], {true, self()}},      [NewEndToken | EndCounters].    send_answers(Pid, Acc, N) ->    {First, Last} = mnemosyne_op:split_list(N, Acc),    Pid ! {answer, self(), First},    Last.    no_pending(Answers, Count, FirstPid, CTid, CursorOwner, Spec, 	   EndCounters, {Tid,PidL,EndC}) ->    ?debugmsg(1, "no_pending Answers=~w, EndCounters=~w\n",	      [Answers, EndCounters]),    receive	{get, CursorOwner, CTid, Nmin, Nmax} ->	    %% Answers requested	    Nanswers = length(Answers),	    if Nmin =< Nanswers ->		    %% Has enough answers already.		    no_pending(send_answers(CursorOwner,Answers,Nmax), Count,			       FirstPid, CTid, CursorOwner, Spec, EndCounters,			       {Tid, PidL, EndC});	      Nanswers < Nmin ->		    %% Has NOT enough answers available, ask for more		    pending(Answers, Count, FirstPid, CTid, 				    CursorOwner, Spec,				    ask_for_more(FirstPid,Nmax,EndCounters),				    Nmin, Nmax, {Tid, PidL, EndC})

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?