mnesia_controller.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页

ERL
2,103
字号
	no ->	    {error, {node_not_running, node()}};	yes ->	  	   	    Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns]),	    receive 		{?MODULE, Pid, Res, New} -> 		    case Res of			ok -> 			    mnesia_lib:add_list(extra_db_nodes, New),			    {ok, New};			{aborted, {throw, Str}} when list(Str) ->			    %%mnesia_recover:disconnect_nodes(New),			    {error, {merge_schema_failed, lists:flatten(Str)}};			Else ->			    {error, Else}		    end;	    		{'EXIT', Pid, Reason} -> 		    {error, Reason}	    end    end.connect_nodes2(Father, Ns) ->    Current = val({current, db_nodes}),    abcast([node()|Ns], {merging_schema, node()}),    {NewC, OldC} = mnesia_recover:connect_nodes(Ns),    Connected = NewC ++OldC,    New1 = mnesia_lib:intersect(Ns, Connected),    New = New1 -- Current,        process_flag(trap_exit, true),    Res = try_merge_schema(New),    Msg = {schema_is_merged, [], late_merge, []},    multicall([node()|Ns], Msg),    After = val({current, db_nodes}),        Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)},    unlink(Father),    ok.    %% Merge the local schema with the schema on other nodes.%% But first we must let all processes that want to force%% load tables wait until the schema merge is done.merge_schema() ->    AllNodes = mnesia_lib:all_nodes(),    case try_merge_schema(AllNodes) of	ok -> 	    schema_is_merged();	{aborted, {throw, Str}} when list(Str) ->	    fatal("Failed to merge schema: ~s~n", [Str]);	Else ->	    fatal("Failed to merge schema: ~p~n", [Else])    end.try_merge_schema(Nodes) ->    case mnesia_schema:merge_schema() of	{atomic, not_merged} ->	    %% No more nodes that we need to merge the schema with	    ok;	{atomic, {merged, OldFriends, NewFriends}} ->	    %% Check if new nodes has been added to the schema	    Diff = mnesia_lib:all_nodes() -- [node() | Nodes],	    mnesia_recover:connect_nodes(Diff),	    %% Tell everybody to adopt orphan tables	    im_running(OldFriends, NewFriends),	    im_running(NewFriends, OldFriends),	    	    try_merge_schema(Nodes);	{atomic, {"Cannot get cstructs", Node, Reason}} ->	    dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]),	    timer:sleep(1000), % Avoid a endless loop look alike	    	    try_merge_schema(Nodes);	Other ->	    Other    end.im_running(OldFriends, NewFriends) ->    abcast(OldFriends, {im_running, node(), NewFriends}).schema_is_merged() ->    MsgTag = schema_is_merged,    SafeLoads = initial_safe_loads(),        %% At this point we do not know anything about    %% which tables that the other nodes already    %% has loaded and therefore we let the normal    %% processing of the loader_queue take care    %% of it, since we at that time point will    %% know the whereabouts. We rely on the fact    %% that all nodes tells each other directly    %% when they have loaded a table and are    %% willing to share it.        try_schedule_late_disc_load(SafeLoads, initial, MsgTag).cast(Msg) ->    case whereis(?SERVER_NAME) of	undefined ->{error, {node_not_running, node()}};	Pid ->  gen_server:cast(Pid, Msg)    end.abcast(Nodes, Msg) ->    gen_server:abcast(Nodes, ?SERVER_NAME, Msg).unsafe_call(Msg) ->    case whereis(?SERVER_NAME) of	undefined -> {error, {node_not_running, node()}};	Pid -> gen_server:call(Pid, Msg, infinity)    end.call(Msg) ->    case whereis(?SERVER_NAME) of	undefined ->	    {error, {node_not_running, node()}};	Pid ->	    link(Pid),	    Res = gen_server:call(Pid, Msg, infinity),	    unlink(Pid),	    %% We get an exit signal if server dies            receive                {'EXIT', Pid, _Reason} ->                    {error, {node_not_running, node()}}            after 0 ->                    ignore            end,	    Res    end.remote_call(Node, Func, Args) ->    case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of	{'EXIT', Error} ->	    {error, Error};	Else ->	    Else    end.    multicall(Nodes, Msg) ->    {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity),    PatchedGood = [Reply || {_Node, Reply} <- Good],    {PatchedGood, Bad}.  %% Make the replies look like rpc:multicalls..%%    rpc:multicall(Nodes, ?MODULE, call, [Msg]).%%%----------------------------------------------------------------------%%% Callback functions from gen_server%%%----------------------------------------------------------------------%%----------------------------------------------------------------------%% Func: init/1%% Returns: {ok, State}          |%%          {ok, State, Timeout} |%%          {stop, Reason}%%----------------------------------------------------------------------init([Parent]) ->    process_flag(trap_exit, true),    mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]),    %% Handshake and initialize transaction recovery    %% for new nodes detected in the schema    All = mnesia_lib:all_nodes(),    Diff = All -- [node() | val(original_nodes)],    mnesia_lib:unset(original_nodes),    mnesia_recover:connect_nodes(Diff),    Interval = mnesia_monitor:get_env(dump_log_time_threshold),    Msg = {async_dump_log, time_threshold},    {ok, Ref} = timer:send_interval(Interval, Msg),    mnesia_dumper:start_regulator(),        Empty = gb_trees:empty(),    {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, 		loader_queue = Empty,		late_loader_queue = Empty}}.%%----------------------------------------------------------------------%% Func: handle_call/3%% Returns: {reply, Reply, State}          |%%          {reply, Reply, State, Timeout} |%%          {noreply, State}               |%%          {noreply, State, Timeout}      |%%          {stop, Reason, Reply, State}   | (terminate/2 is called)%%          {stop, Reason, Reply, State}     (terminate/2 is called)%%----------------------------------------------------------------------handle_call({sync_dump_log, InitBy}, From, State) ->    Worker = #dump_log{initiated_by = InitBy,		       opt_reply_to = From		      },    State2 = add_worker(Worker, State),    noreply(State2);handle_call(wait_for_schema_commit_lock, From, State) ->    Worker = #schema_commit_lock{owner = From},    State2 = add_worker(Worker, State),    noreply(State2);handle_call(block_controller, From, State) ->    Worker = #block_controller{owner = From},    State2 = add_worker(Worker, State),    noreply(State2);handle_call({update,Fun}, From, State) ->    Res = (catch Fun()),    reply(From, Res),     noreply(State);handle_call(get_cstructs, From, State) ->    Tabs = val({schema, tables}),    Cstructs = [val({T, cstruct}) || T <- Tabs],    Running = val({current, db_nodes}),    reply(From, {cstructs, Cstructs, Running}),     noreply(State);handle_call({schema_is_merged, [], late_merge, []}, From, 	    State = #state{schema_is_merged = Merged}) ->    case Merged of	{false, Node} when Node == node(From) ->	    Msgs = State#state.early_msgs,	    State1 = State#state{early_msgs = [], schema_is_merged = true},	    handle_early_msgs(lists:reverse(Msgs), State1);	_ ->	    %% Ooops this came to early, before we have merged :-)	    %% or it came to late or from a node we don't care about	    reply(From, ignore),	    noreply(State)    end;handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) ->    State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State),    %% Handle early messages    Msgs = State2#state.early_msgs,    State3 = State2#state{early_msgs = [], schema_is_merged = true},    handle_early_msgs(lists:reverse(Msgs), State3);handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) ->    LQTabs  = gb_trees:keys(LQ),    LLQTabs = gb_trees:keys(LLQ),    ActiveTabs = lists:sort(mnesia_lib:local_active_tables()),    reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}),    noreply(State);handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) ->    Current = val({current, db_nodes}),    Res = 	case lists:member(AddNode, Current) and 	    (State#state.schema_is_merged == true) of	    true ->		mnesia_lib:add_lsort({Tab, where_to_write}, AddNode);	    false ->		ignore	end,    {reply, Res, State};handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},	    ReplyTo, State) ->    KnownNode = lists:member(ToNode, val({current, db_nodes})),    Merged = State#state.schema_is_merged,    if	KnownNode == false ->	    reply(ReplyTo, ignore),	    noreply(State);	Merged == true ->	    Res = case ?catch_val({Tab, cstruct}) of		      {'EXIT', _} ->  %% Tab deleted			  deleted;		      _ ->			  add_active_replica(Tab, ToNode, RemoteS, AccessMode)		  end,	    reply(ReplyTo, Res),	    noreply(State);	true -> %% Schema is not merged	    Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From},	    Msgs = State#state.early_msgs,	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})    end;handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) ->        KnownNode = lists:member(node(From), val({current, db_nodes})),    Merged = State#state.schema_is_merged,    if	KnownNode == false ->	    reply(ReplyTo, ignore),	    noreply(State);	Merged == true ->	    Res = unannounce_add_table_copy(Tab, Node),	    reply(ReplyTo, Res),	    noreply(State);	true -> %% Schema is not merged	    Msg = {unannounce_add_table_copy, [Tab, Node], From},	    Msgs = State#state.early_msgs,	    reply(ReplyTo, ignore),   %% Reply ignore and add data after schema merge	    %% Set ReplyTO to undefined so we don't reply twice	    noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]})    end;handle_call({net_load, Tab, Cs}, From, State) ->    State2 = 	case State#state.schema_is_merged of	    true -> 	    		Worker = #net_load{table = Tab,				   opt_reply_to = From,				   reason = {dumper,add_table_copy},				   cstruct = Cs				  },		add_worker(Worker, State);	    false -> 		reply(From, {not_loaded, schema_not_merged}),		State	end,    noreply(State2);handle_call(Msg, From, State) when State#state.schema_is_merged /= true ->    %% Buffer early messages    Msgs = State#state.early_msgs,    noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->    State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State),    noreply(State2);handle_call({unblock_table, Tab}, _Dummy, State) ->    Var = {Tab, where_to_commit},    case val(Var) of	{blocked, List} ->	    set(Var, List); % where_to_commit	_ ->	    ignore    end,    {reply, ok, State};handle_call({block_table, [Tab], From}, _Dummy, State) ->    case lists:member(node(From), val({current, db_nodes})) of	true ->	    block_table(Tab);	false ->	    ignore    end,    {reply, ok, State};handle_call({check_w2r, _Node, Tab}, _From, State) ->    {reply, val({Tab, where_to_read}), State};handle_call({add_other, Who}, _From, State = #state{others=Others0}) ->    Others = [Who|Others0],    {reply, ok, State#state{others=Others}};handle_call({del_other, Who}, _From, State = #state{others=Others0}) ->    Others = lists:delete(Who, Others0),    {reply, ok, State#state{others=Others}};	    handle_call(Msg, _From, State) ->    error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]),    noreply(State).late_disc_load(TabsR, Reason, RemoteLoaders, From, 	       State = #state{loader_queue = LQ, late_loader_queue = LLQ}) ->    verbose("Intend to load tables: ~p~n", [TabsR]),    ?eval_debug_fun({?MODULE, late_disc_load},		    [{tabs, TabsR}, 		     {reason, Reason},		     {loaders, RemoteLoaders}]),    reply(From, queued),    %% RemoteLoaders is a list of {ok, Node, Tabs} tuples    %% Remove deleted tabs and queued/loaded    LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))),    Filter = fun(TabInfo0, Acc) -> 		     TabInfo = {Tab,_} = 			 case TabInfo0 of 			     {_,_} -> TabInfo0;			     TabN -> {TabN,Reason}			 end,		     case gb_sets:is_member(Tab, LocalTabs) of			 true -> 			     case ?catch_val({Tab, where_to_read}) == node() of				 true -> Acc;				 false ->				     case gb_trees:is_defined(Tab,LQ) of					 true ->  Acc;					 false -> [TabInfo | Acc]				     end			     end;			 false -> Acc		     end	     end,        Tabs = lists:foldl(Filter, [], TabsR),        Nodes = val({current, db_nodes}),    LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ),    State#state{late_loader_queue = LateQueue}. late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) ->    case gb_trees:is_defined(Tab, LLQ) of	false ->	    LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []),	    case LoadNodes of		[] ->  cast({disc_load, Tab, Reason}); % Ugly cast		_ ->   ignore	    end,	    LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason},	    late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ));	true ->	    late_loaders(Tabs, RemoteLoaders, Nodes, LLQ)	         end;late_loaders([], _RemoteLoaders, _Nodes, LLQ) ->    LLQ.late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) ->    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) ->    late_load_filter(RemoteLoaders, Tab, Nodes, Acc);late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) ->    {ok, Node, Intents} = RL,    Access = val({Tab, access_mode}),    LocalC = val({Tab, local_content}),    StillActive = lists:member(Node, Nodes),    RemoteIntent = lists:member(Tab, Intents),    if

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?