mnesia_controller.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页
ERL
2,103 行
no -> {error, {node_not_running, node()}}; yes -> Pid = spawn_link(?MODULE,connect_nodes2,[self(),Ns]), receive {?MODULE, Pid, Res, New} -> case Res of ok -> mnesia_lib:add_list(extra_db_nodes, New), {ok, New}; {aborted, {throw, Str}} when list(Str) -> %%mnesia_recover:disconnect_nodes(New), {error, {merge_schema_failed, lists:flatten(Str)}}; Else -> {error, Else} end; {'EXIT', Pid, Reason} -> {error, Reason} end end.connect_nodes2(Father, Ns) -> Current = val({current, db_nodes}), abcast([node()|Ns], {merging_schema, node()}), {NewC, OldC} = mnesia_recover:connect_nodes(Ns), Connected = NewC ++OldC, New1 = mnesia_lib:intersect(Ns, Connected), New = New1 -- Current, process_flag(trap_exit, true), Res = try_merge_schema(New), Msg = {schema_is_merged, [], late_merge, []}, multicall([node()|Ns], Msg), After = val({current, db_nodes}), Father ! {?MODULE, self(), Res, mnesia_lib:intersect(Ns,After)}, unlink(Father), ok. %% Merge the local schema with the schema on other nodes.%% But first we must let all processes that want to force%% load tables wait until the schema merge is done.merge_schema() -> AllNodes = mnesia_lib:all_nodes(), case try_merge_schema(AllNodes) of ok -> schema_is_merged(); {aborted, {throw, Str}} when list(Str) -> fatal("Failed to merge schema: ~s~n", [Str]); Else -> fatal("Failed to merge schema: ~p~n", [Else]) end.try_merge_schema(Nodes) -> case mnesia_schema:merge_schema() of {atomic, not_merged} -> %% No more nodes that we need to merge the schema with ok; {atomic, {merged, OldFriends, NewFriends}} -> %% Check if new nodes has been added to the schema Diff = mnesia_lib:all_nodes() -- [node() | Nodes], mnesia_recover:connect_nodes(Diff), %% Tell everybody to adopt orphan tables im_running(OldFriends, NewFriends), im_running(NewFriends, OldFriends), try_merge_schema(Nodes); {atomic, {"Cannot get cstructs", Node, Reason}} -> dbg_out("Cannot get cstructs, Node ~p ~p~n", [Node, Reason]), timer:sleep(1000), % Avoid a endless loop look alike try_merge_schema(Nodes); Other -> Other end.im_running(OldFriends, NewFriends) -> abcast(OldFriends, {im_running, node(), NewFriends}).schema_is_merged() -> MsgTag = schema_is_merged, SafeLoads = initial_safe_loads(), %% At this point we do not know anything about %% which tables that the other nodes already %% has loaded and therefore we let the normal %% processing of the loader_queue take care %% of it, since we at that time point will %% know the whereabouts. We rely on the fact %% that all nodes tells each other directly %% when they have loaded a table and are %% willing to share it. try_schedule_late_disc_load(SafeLoads, initial, MsgTag).cast(Msg) -> case whereis(?SERVER_NAME) of undefined ->{error, {node_not_running, node()}}; Pid -> gen_server:cast(Pid, Msg) end.abcast(Nodes, Msg) -> gen_server:abcast(Nodes, ?SERVER_NAME, Msg).unsafe_call(Msg) -> case whereis(?SERVER_NAME) of undefined -> {error, {node_not_running, node()}}; Pid -> gen_server:call(Pid, Msg, infinity) end.call(Msg) -> case whereis(?SERVER_NAME) of undefined -> {error, {node_not_running, node()}}; Pid -> link(Pid), Res = gen_server:call(Pid, Msg, infinity), unlink(Pid), %% We get an exit signal if server dies receive {'EXIT', Pid, _Reason} -> {error, {node_not_running, node()}} after 0 -> ignore end, Res end.remote_call(Node, Func, Args) -> case catch gen_server:call({?MODULE, Node}, {Func, Args, self()}, infinity) of {'EXIT', Error} -> {error, Error}; Else -> Else end. multicall(Nodes, Msg) -> {Good, Bad} = gen_server:multi_call(Nodes, ?MODULE, Msg, infinity), PatchedGood = [Reply || {_Node, Reply} <- Good], {PatchedGood, Bad}. %% Make the replies look like rpc:multicalls..%% rpc:multicall(Nodes, ?MODULE, call, [Msg]).%%%----------------------------------------------------------------------%%% Callback functions from gen_server%%%----------------------------------------------------------------------%%----------------------------------------------------------------------%% Func: init/1%% Returns: {ok, State} |%% {ok, State, Timeout} |%% {stop, Reason}%%----------------------------------------------------------------------init([Parent]) -> process_flag(trap_exit, true), mnesia_lib:verbose("~p starting: ~p~n", [?SERVER_NAME, self()]), %% Handshake and initialize transaction recovery %% for new nodes detected in the schema All = mnesia_lib:all_nodes(), Diff = All -- [node() | val(original_nodes)], mnesia_lib:unset(original_nodes), mnesia_recover:connect_nodes(Diff), Interval = mnesia_monitor:get_env(dump_log_time_threshold), Msg = {async_dump_log, time_threshold}, {ok, Ref} = timer:send_interval(Interval, Msg), mnesia_dumper:start_regulator(), Empty = gb_trees:empty(), {ok, #state{supervisor = Parent, dump_log_timer_ref = Ref, loader_queue = Empty, late_loader_queue = Empty}}.%%----------------------------------------------------------------------%% Func: handle_call/3%% Returns: {reply, Reply, State} |%% {reply, Reply, State, Timeout} |%% {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, Reply, State} | (terminate/2 is called)%% {stop, Reason, Reply, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_call({sync_dump_log, InitBy}, From, State) -> Worker = #dump_log{initiated_by = InitBy, opt_reply_to = From }, State2 = add_worker(Worker, State), noreply(State2);handle_call(wait_for_schema_commit_lock, From, State) -> Worker = #schema_commit_lock{owner = From}, State2 = add_worker(Worker, State), noreply(State2);handle_call(block_controller, From, State) -> Worker = #block_controller{owner = From}, State2 = add_worker(Worker, State), noreply(State2);handle_call({update,Fun}, From, State) -> Res = (catch Fun()), reply(From, Res), noreply(State);handle_call(get_cstructs, From, State) -> Tabs = val({schema, tables}), Cstructs = [val({T, cstruct}) || T <- Tabs], Running = val({current, db_nodes}), reply(From, {cstructs, Cstructs, Running}), noreply(State);handle_call({schema_is_merged, [], late_merge, []}, From, State = #state{schema_is_merged = Merged}) -> case Merged of {false, Node} when Node == node(From) -> Msgs = State#state.early_msgs, State1 = State#state{early_msgs = [], schema_is_merged = true}, handle_early_msgs(lists:reverse(Msgs), State1); _ -> %% Ooops this came to early, before we have merged :-) %% or it came to late or from a node we don't care about reply(From, ignore), noreply(State) end;handle_call({schema_is_merged, TabsR, Reason, RemoteLoaders}, From, State) -> State2 = late_disc_load(TabsR, Reason, RemoteLoaders, From, State), %% Handle early messages Msgs = State2#state.early_msgs, State3 = State2#state{early_msgs = [], schema_is_merged = true}, handle_early_msgs(lists:reverse(Msgs), State3);handle_call(disc_load_intents,From,State = #state{loader_queue=LQ,late_loader_queue=LLQ}) -> LQTabs = gb_trees:keys(LQ), LLQTabs = gb_trees:keys(LLQ), ActiveTabs = lists:sort(mnesia_lib:local_active_tables()), reply(From, {ok, node(), ordsets:union([LQTabs,LLQTabs,ActiveTabs])}), noreply(State);handle_call({update_where_to_write, [add, Tab, AddNode], _From}, _Dummy, State) -> Current = val({current, db_nodes}), Res = case lists:member(AddNode, Current) and (State#state.schema_is_merged == true) of true -> mnesia_lib:add_lsort({Tab, where_to_write}, AddNode); false -> ignore end, {reply, Res, State};handle_call({add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, ReplyTo, State) -> KnownNode = lists:member(ToNode, val({current, db_nodes})), Merged = State#state.schema_is_merged, if KnownNode == false -> reply(ReplyTo, ignore), noreply(State); Merged == true -> Res = case ?catch_val({Tab, cstruct}) of {'EXIT', _} -> %% Tab deleted deleted; _ -> add_active_replica(Tab, ToNode, RemoteS, AccessMode) end, reply(ReplyTo, Res), noreply(State); true -> %% Schema is not merged Msg = {add_active_replica, [Tab, ToNode, RemoteS, AccessMode], From}, Msgs = State#state.early_msgs, reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) end;handle_call({unannounce_add_table_copy, [Tab, Node], From}, ReplyTo, State) -> KnownNode = lists:member(node(From), val({current, db_nodes})), Merged = State#state.schema_is_merged, if KnownNode == false -> reply(ReplyTo, ignore), noreply(State); Merged == true -> Res = unannounce_add_table_copy(Tab, Node), reply(ReplyTo, Res), noreply(State); true -> %% Schema is not merged Msg = {unannounce_add_table_copy, [Tab, Node], From}, Msgs = State#state.early_msgs, reply(ReplyTo, ignore), %% Reply ignore and add data after schema merge %% Set ReplyTO to undefined so we don't reply twice noreply(State#state{early_msgs = [{call, Msg, undefined} | Msgs]}) end;handle_call({net_load, Tab, Cs}, From, State) -> State2 = case State#state.schema_is_merged of true -> Worker = #net_load{table = Tab, opt_reply_to = From, reason = {dumper,add_table_copy}, cstruct = Cs }, add_worker(Worker, State); false -> reply(From, {not_loaded, schema_not_merged}), State end, noreply(State2);handle_call(Msg, From, State) when State#state.schema_is_merged /= true -> %% Buffer early messages Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{call, Msg, From} | Msgs]});handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> State2 = late_disc_load(Tabs, Reason, RemoteLoaders, From, State), noreply(State2);handle_call({unblock_table, Tab}, _Dummy, State) -> Var = {Tab, where_to_commit}, case val(Var) of {blocked, List} -> set(Var, List); % where_to_commit _ -> ignore end, {reply, ok, State};handle_call({block_table, [Tab], From}, _Dummy, State) -> case lists:member(node(From), val({current, db_nodes})) of true -> block_table(Tab); false -> ignore end, {reply, ok, State};handle_call({check_w2r, _Node, Tab}, _From, State) -> {reply, val({Tab, where_to_read}), State};handle_call({add_other, Who}, _From, State = #state{others=Others0}) -> Others = [Who|Others0], {reply, ok, State#state{others=Others}};handle_call({del_other, Who}, _From, State = #state{others=Others0}) -> Others = lists:delete(Who, Others0), {reply, ok, State#state{others=Others}}; handle_call(Msg, _From, State) -> error("~p got unexpected call: ~p~n", [?SERVER_NAME, Msg]), noreply(State).late_disc_load(TabsR, Reason, RemoteLoaders, From, State = #state{loader_queue = LQ, late_loader_queue = LLQ}) -> verbose("Intend to load tables: ~p~n", [TabsR]), ?eval_debug_fun({?MODULE, late_disc_load}, [{tabs, TabsR}, {reason, Reason}, {loaders, RemoteLoaders}]), reply(From, queued), %% RemoteLoaders is a list of {ok, Node, Tabs} tuples %% Remove deleted tabs and queued/loaded LocalTabs = gb_sets:from_ordset(lists:sort(mnesia_lib:val({schema,local_tables}))), Filter = fun(TabInfo0, Acc) -> TabInfo = {Tab,_} = case TabInfo0 of {_,_} -> TabInfo0; TabN -> {TabN,Reason} end, case gb_sets:is_member(Tab, LocalTabs) of true -> case ?catch_val({Tab, where_to_read}) == node() of true -> Acc; false -> case gb_trees:is_defined(Tab,LQ) of true -> Acc; false -> [TabInfo | Acc] end end; false -> Acc end end, Tabs = lists:foldl(Filter, [], TabsR), Nodes = val({current, db_nodes}), LateQueue = late_loaders(Tabs, RemoteLoaders, Nodes, LLQ), State#state{late_loader_queue = LateQueue}. late_loaders([{Tab, Reason} | Tabs], RemoteLoaders, Nodes, LLQ) -> case gb_trees:is_defined(Tab, LLQ) of false -> LoadNodes = late_load_filter(RemoteLoaders, Tab, Nodes, []), case LoadNodes of [] -> cast({disc_load, Tab, Reason}); % Ugly cast _ -> ignore end, LateLoad = #late_load{table=Tab,loaders=LoadNodes,reason=Reason}, late_loaders(Tabs, RemoteLoaders, Nodes, gb_trees:insert(Tab,LateLoad,LLQ)); true -> late_loaders(Tabs, RemoteLoaders, Nodes, LLQ) end;late_loaders([], _RemoteLoaders, _Nodes, LLQ) -> LLQ.late_load_filter([{error, _} | RemoteLoaders], Tab, Nodes, Acc) -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc);late_load_filter([{badrpc, _} | RemoteLoaders], Tab, Nodes, Acc) -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc);late_load_filter([RL | RemoteLoaders], Tab, Nodes, Acc) -> {ok, Node, Intents} = RL, Access = val({Tab, access_mode}), LocalC = val({Tab, local_content}), StillActive = lists:member(Node, Nodes), RemoteIntent = lists:member(Tab, Intents), if
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?