mnesia_controller.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页
ERL
2,103 行
Access == read_write, LocalC == false, StillActive == true, RemoteIntent == true -> Masters = mnesia_recover:get_master_nodes(Tab), case lists:member(Node, Masters) of true -> %% The other node is master node for %% the table, accept his load intent late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); false when Masters == [] -> %% The table has no master nodes %% accept his load intent late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]); false -> %% Some one else is master node for %% the table, ignore his load intent late_load_filter(RemoteLoaders, Tab, Nodes, Acc) end; true -> late_load_filter(RemoteLoaders, Tab, Nodes, Acc) end;late_load_filter([], _Tab, _Nodes, Acc) -> Acc. %%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_cast({release_schema_commit_lock, _Owner}, State) -> if State#state.is_stopping == true -> {stop, shutdown, State}; true -> case State#state.dumper_queue of [#schema_commit_lock{}|Rest] -> [_Worker | Rest] = State#state.dumper_queue, State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3); _ -> noreply(State) end end;handle_cast(unblock_controller, State) -> if State#state.is_stopping == true -> {stop, shutdown, State}; record(hd(State#state.dumper_queue), block_controller) -> [_Worker | Rest] = State#state.dumper_queue, State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3) end;handle_cast({mnesia_down, Node}, State) -> maybe_log_mnesia_down(Node), mnesia_lib:del({current, db_nodes}, Node), mnesia_checkpoint:tm_mnesia_down(Node), Alltabs = val({schema, tables}), reconfigure_tables(Node, Alltabs), %% Done from (external point of view) mnesia_monitor:mnesia_down(?SERVER_NAME, Node), %% Fix if we are late_merging against the node that went down case State#state.schema_is_merged of {false, Node} -> spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]); _ -> ignore end, %% Fix internal stuff LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue), case get_senders(State) ++ get_loaders(State) of [] -> ignore; Senders -> lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end, Senders) end, lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, State#state.others), Remove = fun(ST) -> node(ST#send_table.receiver_pid) /= Node end, NewSenders = lists:filter(Remove, State#state.sender_queue), Early = remove_early_messages(State#state.early_msgs, Node), noreply(State#state{sender_queue = NewSenders, early_msgs = Early, late_loader_queue = LateQ });handle_cast({merging_schema, Node}, State) -> case State#state.schema_is_merged of false -> %% This comes from dynamic connect_nodes which are made %% after mnesia:start() and the schema_merge. ImANewKidInTheBlock = (val({schema, storage_type}) == ram_copies) andalso (mnesia_lib:val({schema, local_tables}) == [schema]), case ImANewKidInTheBlock of true -> %% I'm newly started ram_node.. noreply(State#state{schema_is_merged = {false, Node}}); false -> noreply(State) end; _ -> %% Already merging schema. noreply(State) end;handle_cast(Msg, State) when State#state.schema_is_merged /= true -> %% Buffer early messages Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});%% This must be done after schema_is_merged otherwise adopt_orphan%% might trigger a table load from wrong nodes as a result of that we don't %% know which tables we can load safly first.handle_cast({im_running, _Node, NewFriends}, State) -> Tabs = mnesia_lib:local_active_tables() -- [schema], Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})), abcast(Ns, {adopt_orphans, node(), Tabs}), noreply(State);handle_cast({disc_load, Tab, Reason}, State) -> Worker = #disc_load{table = Tab, reason = Reason}, State2 = add_worker(Worker, State), noreply(State2);handle_cast(Worker, State) when record(Worker, send_table) -> State2 = add_worker(Worker, State), noreply(State2);handle_cast({sync_tabs, Tabs, From}, State) -> %% user initiated wait_for_tables handle_sync_tabs(Tabs, From), noreply(State);handle_cast({i_have_tab, Tab, Node}, State) -> case lists:member(Node, val({current, db_nodes})) of true -> State2 = node_has_tabs([Tab], Node, State), noreply(State2); false -> noreply(State) end;handle_cast({force_load_updated, Tab}, State) -> case val({Tab, active_replicas}) of [] -> %% No valid replicas noreply(State); [SomeNode | _] -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; handle_cast({master_nodes_updated, Tab, Masters}, State) -> Active = val({Tab, active_replicas}), Valid = case val({Tab, load_by_force}) of true -> Active; false -> if Masters == [] -> Active; true -> mnesia_lib:intersect(Masters, Active) end end, case Valid of [] -> %% No valid replicas noreply(State); [SomeNode | _] -> State2 = node_has_tabs([Tab], SomeNode, State), noreply(State2) end; handle_cast({adopt_orphans, Node, Tabs}, State) -> State2 = node_has_tabs(Tabs, Node, State), %% Register the other node as up and running mnesia_recover:log_mnesia_up(Node), verbose("Logging mnesia_up ~w~n",[Node]), mnesia_lib:report_system_event({mnesia_up, Node}), %% Load orphan tables LocalTabs = val({schema, local_tables}) -- [schema], Nodes = val({current, db_nodes}), {LocalOrphans, RemoteMasters} = orphan_tables(LocalTabs, Node, Nodes, [], []), Reason = {adopt_orphan, node()}, mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason), Fun = fun(N) -> RemoteOrphans = [Tab || {Tab, Ns} <- RemoteMasters, lists:member(N, Ns)], mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason) end, lists:foreach(Fun, Nodes), noreply(State2);handle_cast(Msg, State) -> error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]), noreply(State).handle_sync_tabs([Tab | Tabs], From) -> case val({Tab, where_to_read}) of nowhere -> case get({sync_tab, Tab}) of undefined -> put({sync_tab, Tab}, [From]); Pids -> put({sync_tab, Tab}, [From | Pids]) end; _ -> sync_reply(From, Tab) end, handle_sync_tabs(Tabs, From);handle_sync_tabs([], _From) -> ok.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_info({async_dump_log, InitBy}, State) -> Worker = #dump_log{initiated_by = InitBy}, State2 = add_worker(Worker, State), noreply(State2);handle_info(Done, State) when record(Done, dumper_done) -> Pid = Done#dumper_done.worker_pid, Res = Done#dumper_done.worker_res, if State#state.is_stopping == true -> {stop, shutdown, State}; Res == dumped, Pid == State#state.dumper_pid -> [Worker | Rest] = State#state.dumper_queue, reply(Worker#dump_log.opt_reply_to, Res), State2 = State#state{dumper_pid = undefined, dumper_queue = Rest}, State3 = opt_start_worker(State2), noreply(State3); true -> fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]), {stop, fatal, State} end;handle_info(Done, State0) when record(Done, loader_done) -> WPid = Done#loader_done.worker_pid, LateQueue0 = State0#state.late_loader_queue, Tab = Done#loader_done.table_name, State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))}, State2 = case Done#loader_done.is_loaded of true -> %% Optional table announcement if Done#loader_done.needs_announce == true, Done#loader_done.needs_reply == true -> i_have_tab(Tab), %% Should be {dumper,add_table_copy} only reply(Done#loader_done.reply_to, Done#loader_done.reply); Done#loader_done.needs_reply == true -> %% Should be {dumper,add_table_copy} only reply(Done#loader_done.reply_to, Done#loader_done.reply); Done#loader_done.needs_announce == true, Tab == schema -> i_have_tab(Tab); Done#loader_done.needs_announce == true -> i_have_tab(Tab), %% Local node needs to perform user_sync_tab/1 Ns = val({current, db_nodes}), abcast(Ns, {i_have_tab, Tab, node()}); Tab == schema -> ignore; true -> %% Local node needs to perform user_sync_tab/1 Ns = val({current, db_nodes}), AlreadyKnows = val({Tab, active_replicas}), abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()}) end, %% Optional user sync case Done#loader_done.needs_sync of true -> user_sync_tab(Tab); false -> ignore end, State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)}; false -> %% Either the node went down or table was not %% loaded remotly yet case Done#loader_done.needs_reply of true -> reply(Done#loader_done.reply_to, Done#loader_done.reply); false -> ignore end, case ?catch_val({Tab, active_replicas}) of [_|_] -> % still available elsewhere {value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)), add_loader(Tab,Worker,State1); _ -> State1 end end, State3 = opt_start_worker(State2), noreply(State3);handle_info(Done, State) when record(Done, sender_done) -> Pid = Done#sender_done.worker_pid, Res = Done#sender_done.worker_res, Senders = get_senders(State), {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders), if Res == ok -> State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, State3 = opt_start_worker(State2), noreply(State3); true -> %% No need to send any message to the table receiver %% since it will soon get a mnesia_down anyway fatal("Sender failed: ~p~n state: ~p~n", [Res, State]), {stop, fatal, State} end;handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> catch set(mnesia_status, stopping), case State#state.dumper_pid of undefined -> dbg_out("~p was ~p~n", [?SERVER_NAME, R]), {stop, shutdown, State}; _ -> noreply(State#state{is_stopping = true}) end;handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid -> case State#state.dumper_queue of [#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]), State2 = State#state{dumper_queue = Workers, dumper_pid = undefined}, State3 = opt_start_worker(State2), noreply(State3); _Other -> fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State} end;handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout -> case lists:keymember(Pid, 1, get_senders(State)) of true -> %% No need to send any message to the table receiver %% since it will soon get a mnesia_down anyway fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]), {stop, fatal, State}; false -> case lists:keymember(Pid, 1, get_loaders(State)) of true -> fatal("Loader crashed: ~p~n state: ~p~n", [R, State]), {stop, fatal, State}; false -> error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), noreply(State) end end;handle_info({From, get_state}, State) -> From ! {?SERVER_NAME, State}, noreply(State);%% No real need for bufferinghandle_info(Msg, State) when State#state.schema_is_merged /= true -> %% Buffer early messages Msgs = State#state.early_msgs, noreply(State#state{early_msgs = [{info, Msg} | Msgs]});handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) -> sync_tab_timeout(Pid, get()), noreply(State);handle_info(Msg, State) -> error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]), noreply(State).sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) -> case lists:delete(Pid, Pids) of [] -> erase({sync_tab, Tab}); Pids2 -> put({sync_tab, Tab}, Pids2) end, sync_tab_timeout(Pid, Tail);sync_tab_timeout(Pid, [_ | Tail]) -> sync_tab_timeout(Pid, Tail);sync_tab_timeout(_Pid, []) -> ok.%% Pick the load record that has the highest load order%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is emptypick_next(Queue) -> List = gb_trees:values(Queue), case pick_next(List, none, none) of
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?