mnemosyne_exec.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 672 行 · 第 1/2 页
ERL
672 行
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%-module(mnemosyne_exec).-export([mk_patterns/1]).-export([setup/2]).-export([setup_collector_and_query/1, init_query/2, get_answers/3, kill_processes/1]).-export([start_simple_loop1/4, start_simple_loop2/3, start_complicated_loop/3]). %% spawned%%-export([simple_loop1/4, simple_loop2/3, complicated_loop/3]). %% spawned%%-define(debug,42).-include("mnemosyne_debug.hrl").-include("mnemosyne_internal_form.hrl").%%%================================================================%%% Exports%%% eval_collect(Nsols, Query, Fun) where%%% Fun = fun(Accu,P) is foldl'd over all bound patterns and the result is%%% returned%%% Ex1. fun(Accu,P) ->%%% [P|Accu]%%% end%%% collects all solutions.%%% Ex2. fun(Accu,P) ->%%% m:f(P),%%% Accu%%% end%%% applies m:f on every solution but does not return any result-record(spec, {func, table = undefined, pattern }).%%%----------------------------------------------------------------%%% Cursor handling%%% %% Supposed to be called as:%% H = setup_collector_and_query(Q),%% ....%% mnesia:transaction(%% fun() ->%% Crsr = init_query(H [,Number_of_answers_to_prefetch]),%% ....%% loop over L1 = get_answers(Crsr [,Min,Max]),%% ....%% end),%% ....%% kill_processes(H)setup_collector_and_query(OptQuery) when is_record(OptQuery,optimizer_result) -> Spec = #spec{func = fun(Accu,P) -> [P|Accu] end, pattern = OptQuery#optimizer_result.pattern}, CollectorPid = case OptQuery#optimizer_result.how_to_eval of mnesia_match_build -> %% fetch all solutions using mnesia:match_object [Code] = OptQuery#optimizer_result.code, Bs = OptQuery#optimizer_result.common_bs, ?debugmsg(1, "Using mnesia:match_object(~w)\n", [Code#pred_sym.pattern]), spawn_link(?MODULE, start_simple_loop1, [self(),Bs,Code,Spec]); query_eval when OptQuery#optimizer_result.code==[] -> %% has the single answer as the pattern Solutions = [OptQuery#optimizer_result.pattern], spawn_link(?MODULE, start_simple_loop2, [self(),Solutions,Spec]); query_eval -> %% Must start all processes spawn_link(?MODULE, start_complicated_loop, [self(),OptQuery,Spec]) end, CursorOwner = self(), {query_collector, CollectorPid, CursorOwner}.%%%----init_query({query_collector,CollectorPid,CursorOwner}, N) -> Tid = mnesia:get_activity_id(), legal_pid_tid(CollectorPid, CursorOwner, Tid), CollectorPid ! {attention,CursorOwner}, CollectorPid ! {init_collector,CursorOwner,Tid,N}, {exec_cursor, CollectorPid, CursorOwner, Tid}.%%%----get_answers({exec_cursor,CollectorPid,CursorOwner,Tid}, Nmin, Nmax) -> legal_pid_tid(CollectorPid, CursorOwner, Tid), CollectorPid ! {get, CursorOwner, Tid, Nmin, Nmax}, receive {answer,CollectorPid,Answers} -> Answers; {fail,_,Cause} -> throw({'EXIT',Cause}); {mnesia_down, Node} -> mnesia:abort({node_not_running, Node}) end. legal_pid_tid(CollectorPid, CursorOwner, Tid0) -> Tid = mnesia:get_activity_id(), CollectorProcessInfo = process_info(CollectorPid), if CursorOwner =/= self() -> throw({'EXIT',{aborted,not_cursor_owner}}); Tid == undefined -> throw({'EXIT',{aborted,no_transaction}}); Tid =/= Tid0 -> throw({'EXIT',{aborted,wrong_transaction}}); CollectorProcessInfo == undefined -> throw({'EXIT',{aborted,no_collector}}); true -> ok end. %%%----kill_processes({query_collector,CollectorPid,CursorOwner}) -> if CursorOwner =/= self() -> throw({'EXIT',{aborted,not_cursor_owner}}); true -> unlink(CollectorPid), exit(CollectorPid, deleted) end.%%%----------------start_simple_loop1(Self,Bs,Code,Spec) -> simple_loop1(Self,Bs,Code,Spec).start_simple_loop2(Self,Solutions,Spec) -> simple_loop2(Self,Solutions,Spec).start_complicated_loop(Self,OptQuery,Spec) -> complicated_loop(Self,OptQuery,Spec).simple_loop1(CursorOwner, Bs, Code, Spec) -> ?debugmsg(1, "simple_loop1(CursorOwner=~w, Bs=~w, Code=~w, Spec=~w)\n", [CursorOwner, Bs, Code, Spec] ), receive {init_collector, CursorOwner, Tid, Nprefetch} -> case Spec#spec.table of undefined -> true; _ -> ets:delete (Spec#spec.table) end, Spec2 = Spec#spec{table=ets:new(filter2,[set,public])}, mnesia:put_activity_id(Tid), case catch mnemosyne_op:get_from_table(Bs,Code,[]) of {'EXIT', Cause} -> CursorOwner ! {fail, self(), Cause}, simple_loop1(CursorOwner, Bs, Code, Spec); Solutions -> simple_loop(filter_result(Spec2,Solutions), Tid, CursorOwner, Spec2), simple_loop1(CursorOwner, Bs, Code, Spec2) end; {activity_ended, Activity, ReplyTo} -> mnesia:put_activity_id(Activity), ReplyTo ! {activity_ended, Activity, self()}, simple_loop1(CursorOwner, Bs, Code, Spec); {attention,CursorOwner} -> simple_loop1(CursorOwner, Bs, Code, Spec) end. simple_loop2(CursorOwner, Solutions, Spec) -> ?debugmsg(1, "simple_loop2(CursorOwner=~w, Solutions=~w, Spec=~w)\n", [CursorOwner, Solutions, Spec] ), receive {init_collector, CursorOwner, Tid, Nprefetch} -> case Spec#spec.table of undefined -> true; _ -> ets:delete (Spec#spec.table) end, Spec2 = Spec#spec{table=ets:new(filter3,[set,public])}, mnesia:put_activity_id(Tid), simple_loop(Solutions, Tid, CursorOwner, Spec2), simple_loop2(CursorOwner, Solutions, Spec2); {activity_ended, Activity, ReplyTo} -> mnesia:put_activity_id(Activity), ReplyTo ! {activity_ended, Activity, self()}, simple_loop2(CursorOwner, Solutions, Spec); {attention,CursorOwner} -> simple_loop2(CursorOwner, Solutions, Spec) end.complicated_loop(CursorOwner, OptQuery, Spec) -> ?debugmsg(1, "complicated_loop(CursorOwner=~w, OptQuery=~w, Spec=~w)\n", [CursorOwner, OptQuery, Spec] ), %% process_flag(trap_exit, true), {FirstPid,Count} = setup_query(OptQuery#optimizer_result.code, self()), complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec).complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec) -> receive {init_collector, CursorOwner, Tid, Nprefetch} -> case Spec#spec.table of undefined -> true; _ -> ets:delete (Spec#spec.table) end, Spec2 = Spec#spec{table=ets:new(filter4,[set,public])}, mnesia:put_activity_id(Tid), FirstPid ! {tid,Tid, self()}, EndCounters = [mnemosyne_op:new_end_token(1)], FirstPid ! {bss, [OptQuery#optimizer_result.common_bs], Nprefetch, EndCounters, [], {true, self()}}, Reply = no_pending([], Count, FirstPid, Tid, CursorOwner, Spec2, EndCounters, {[],[],[]}), CursorOwner ! Reply, complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec2); {activity_ended, Activity, ReplyTo} -> mnesia:put_activity_id(Activity), Message = {tid, Activity}, FirstPid ! Message, flush_until(Message), ReplyTo ! {activity_ended, Activity, self()}, complicated_loop_cont(FirstPid, Count,CursorOwner, OptQuery, Spec); {attention,CursorOwner} -> complicated_loop_cont(FirstPid, Count, CursorOwner, OptQuery, Spec) end.flush_until(Message) -> receive Message -> ok; Other -> flush_until(Message) end.simple_loop(Answers, Tid, CursorOwner, Spec) -> ?debugmsg(1, "simple_loop Answers=~w\n", [Answers]), receive {get, CursorOwner, Tid, Nmin, Nmax} -> {First, Last} = mnemosyne_op:split_list(Nmax, Answers), CursorOwner ! {answer, self(), First}, simple_loop(Last, Tid, CursorOwner, Spec); {activity_ended, Activity, ReplyTo} -> mnesia:put_activity_id(Activity), ReplyTo ! {activity_ended, Activity, self()}, ok; {fail, SomePid, Cause} -> CursorOwner ! {fail, self(), Cause}, ok; {attention,CursorOwner} -> ok end. ask_for_more(Pid, N, EndCounters) when is_pid(Pid) -> NewEndToken = mnemosyne_op:new_end_token(1), Pid ! {bss, [], N, [NewEndToken], [], {true, self()}}, [NewEndToken | EndCounters]. send_answers(Pid, Acc, N) -> {First, Last} = mnemosyne_op:split_list(N, Acc), Pid ! {answer, self(), First}, Last. no_pending(Answers, Count, FirstPid, CTid, CursorOwner, Spec, EndCounters, {Tid,PidL,EndC}) -> ?debugmsg(1, "no_pending Answers=~w, EndCounters=~w\n", [Answers, EndCounters]), receive {get, CursorOwner, CTid, Nmin, Nmax} -> %% Answers requested Nanswers = length(Answers), if Nmin =< Nanswers -> %% Has enough answers already. no_pending(send_answers(CursorOwner,Answers,Nmax), Count, FirstPid, CTid, CursorOwner, Spec, EndCounters, {Tid, PidL, EndC}); Nanswers < Nmin -> %% Has NOT enough answers available, ask for more pending(Answers, Count, FirstPid, CTid, CursorOwner, Spec, ask_for_more(FirstPid,Nmax,EndCounters), Nmin, Nmax, {Tid, PidL, EndC})
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?