mnesia_controller.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页
ERL
2,103 行
none -> {none, gb_trees:empty()}; {Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)} end.pick_next([Head | Tail], Load, Order) when record(Head, net_load) -> Tab = Head#net_load.table, select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);pick_next([Head | Tail], Load, Order) when record(Head, disc_load) -> Tab = Head#disc_load.table, select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);pick_next([], none, _Order) -> none;pick_next([], Load, _Order) -> {element(2,Load), Load}.select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) -> %% Table have been deleted drop it. pick_next(Tail, Load, Order);select_best(Load, Tail, Order, none, none) -> pick_next(Tail, Load, Order);select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder -> pick_next(Tail, Load, Order);select_best(_Load, Tail, _Order, OldLoad, OldOrder) -> pick_next(Tail, OldLoad, OldOrder).%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, State) -> mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Upgrade process when its code is to be changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_OldVsn, State0, _Extra) -> %% Loader Queue State1 = case State0#state.loader_pid of Pids when is_list(Pids) -> State0; undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()}; Pid when is_pid(Pid) -> [Loader|Rest] = State0#state.loader_queue, LQ0 = [{element(2,Rec),Rec} || Rec <- Rest], LQ1 = lists:sort(LQ0), LQ = gb_trees:from_orddict(LQ1), State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ} end, %% LateLoaderQueue State = if is_list(State1#state.late_loader_queue) -> LLQ0 = State1#state.late_loader_queue, LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]), LLQ = gb_trees:from_orddict(LLQ1), State1#state{late_loader_queue=LLQ}; true -> State1 end, {ok, State}. %%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------maybe_log_mnesia_down(N) -> %% We use mnesia_down when deciding which tables to load locally, %% so if we are not running (i.e haven't decided which tables %% to load locally), don't log mnesia_down yet. case mnesia_lib:is_running() of yes -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), ok; _ -> Filter = fun(Tab) -> inactive_copy_holders(Tab, N) end, HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]), if HalfLoadedTabs == true -> verbose("Logging mnesia_down ~w~n", [N]), mnesia_recover:log_mnesia_down(N), ok; true -> %% Unfortunately we have not loaded some common %% tables yet, so we cannot rely on the nodedown log_later %% BUGBUG handle this case!!! end end.inactive_copy_holders(Tab, Node) -> Cs = val({Tab, cstruct}), case mnesia_lib:cs_to_storage_type(Node, Cs) of unknown -> false; _Storage -> mnesia_lib:not_active_here(Tab) end.orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) -> Cs = val({Tab, cstruct}), CopyHolders = mnesia_lib:copy_holders(Cs), RamCopyHolders = Cs#cstruct.ram_copies, DiscCopyHolders = CopyHolders -- RamCopyHolders, DiscNodes = val({schema, disc_copies}), LocalContent = Cs#cstruct.local_content, RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes), Active = val({Tab, active_replicas}), BeingCreated = (?catch_val({Tab, create_table}) == true), Read = val({Tab, where_to_read}), case lists:member(Node, DiscCopyHolders) of _ when BeingCreated == true -> orphan_tables(Tabs, Node, Ns, Local, Remote); _ when Read == node() -> %% Allready loaded orphan_tables(Tabs, Node, Ns, Local, Remote); true when Active == [] -> case DiscCopyHolders -- Ns of [] -> %% We're last up and the other nodes have not %% loaded the table. Lets load it if we are %% the smallest node. case lists:min(DiscCopyHolders) of Min when Min == node() -> case mnesia_recover:get_master_nodes(Tab) of [] -> L = [Tab | Local], orphan_tables(Tabs, Node, Ns, L, Remote); Masters -> R = [{Tab, Masters} | Remote], orphan_tables(Tabs, Node, Ns, Local, R) end; _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end; _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end; false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] -> %% Special case when all replicas resides on disc less nodes orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); _ when LocalContent == true -> orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote); _ -> orphan_tables(Tabs, Node, Ns, Local, Remote) end;orphan_tables([], _, _, LocalOrphans, RemoteMasters) -> {LocalOrphans, RemoteMasters}.node_has_tabs([Tab | Tabs], Node, State) when Node /= node() -> State2 = case catch update_whereabouts(Tab, Node, State) of State1 = #state{} -> State1; {'EXIT', R} -> %% Tab was just deleted? case ?catch_val({Tab, cstruct}) of {'EXIT', _} -> State; % yes _ -> erlang:fault(R) end end, node_has_tabs(Tabs, Node, State2);node_has_tabs([Tab | Tabs], Node, State) -> user_sync_tab(Tab), node_has_tabs(Tabs, Node, State);node_has_tabs([], _Node, State) -> State.update_whereabouts(Tab, Node, State) -> Storage = val({Tab, storage_type}), Read = val({Tab, where_to_read}), LocalC = val({Tab, local_content}), BeingCreated = (?catch_val({Tab, create_table}) == true), Masters = mnesia_recover:get_master_nodes(Tab), ByForce = val({Tab, load_by_force}), GoGetIt = if ByForce == true -> true; Masters == [] -> true; true -> lists:member(Node, Masters) end, dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n", [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]), if LocalC == true -> %% Local contents, don't care about other node State; BeingCreated == true -> %% The table is currently being created %% It will be handled elsewhere State; Storage == unknown, Read == nowhere -> %% No own copy, time to read remotely %% if the other node is a good node add_active_replica(Tab, Node), case GoGetIt of true -> set({Tab, where_to_read}, Node), user_sync_tab(Tab), State; false -> State end; Storage == unknown -> %% No own copy, continue to read remotely add_active_replica(Tab, Node), NodeST = mnesia_lib:storage_type_at_node(Node, Tab), ReadST = mnesia_lib:storage_type_at_node(Read, Tab), if %% Avoid reading from disc_only_copies NodeST == disc_only_copies -> ignore; ReadST == disc_only_copies -> mnesia_lib:set_remote_where_to_read(Tab); true -> ignore end, user_sync_tab(Tab), State; Read == nowhere -> %% Own copy, go and get a copy of the table %% if the other node is master or if there %% are no master at all add_active_replica(Tab, Node), case GoGetIt of true -> Worker = #net_load{table = Tab, reason = {active_remote, Node}}, add_worker(Worker, State); false -> State end; true -> %% We already have an own copy add_active_replica(Tab, Node), user_sync_tab(Tab), State end.initial_safe_loads() -> case val({schema, storage_type}) of ram_copies -> Downs = [], Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs); disc_copies -> Downs = mnesia_recover:get_mnesia_downs(), dbg_out("mnesia_downs = ~p~n", [Downs]), Tabs = val({schema, local_tables}) -- [schema], LastC = fun(T) -> last_consistent_replica(T, Downs) end, lists:zf(LastC, Tabs) end. last_consistent_replica(Tab, Downs) -> Cs = val({Tab, cstruct}), Storage = mnesia_lib:cs_to_storage_type(node(), Cs), Ram = Cs#cstruct.ram_copies, Disc = Cs#cstruct.disc_copies, DiscOnly = Cs#cstruct.disc_only_copies, BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs, BetterCopies = BetterCopies0 -- Ram, AccessMode = Cs#cstruct.access_mode, Copies = mnesia_lib:copy_holders(Cs), Masters = mnesia_recover:get_master_nodes(Tab), LocalMaster0 = lists:member(node(), Masters), LocalContent = Cs#cstruct.local_content, RemoteMaster = if Masters == [] -> false; true -> not LocalMaster0 end, LocalMaster = if Masters == [] -> false; true -> LocalMaster0 end, if Copies == [node()] -> %% Only one copy holder and it is local. %% It may also be a local contents table {true, {Tab, local_only}}; LocalContent == true -> {true, {Tab, local_content}}; LocalMaster == true -> %% We have a local master {true, {Tab, local_master}}; RemoteMaster == true -> %% Wait for remote master copy false; Storage == ram_copies -> if Disc == [], DiscOnly == [] -> %% Nobody has copy on disc {true, {Tab, ram_only}}; true -> %% Some other node has copy on disc false end; AccessMode == read_only -> %% No one has been able to update the table, %% i.e. all disc resident copies are equal {true, {Tab, read_only}}; BetterCopies /= [], Masters /= [node()] -> %% There are better copies on other nodes %% and we do not have the only master copy false; true -> {true, {Tab, initial}} end.reconfigure_tables(N, [Tab |Tail]) -> del_active_replica(Tab, N), case val({Tab, where_to_read}) of N -> mnesia_lib:set_remote_where_to_read(Tab); _ -> ignore end, reconfigure_tables(N, Tail);reconfigure_tables(_, []) -> ok.remove_loaders([Tab| Tabs], N, Loaders) -> LateQ = drop_loaders(Tab, N, Loaders), remove_loaders(Tabs, N, LateQ);remove_loaders([],_, LateQ) -> LateQ.remove_early_messages([], _Node) -> [];remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) -> remove_early_messages(R, Node); %% Does a reply before queuingremove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node) when node(From) == Node -> reply(ReplyTo, ok), %% Remove gen:server waits.. remove_early_messages(R, Node);remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) -> remove_early_messages(R, Node);remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) -> remove_early_messages(R, Node);remove_early_messages([M|R],Node) -> [M|remove_early_messages(R,Node)].%% Drop loader from late load queue and possibly trigger a disc_loaddrop_loaders(Tab, Node, LLQ) -> case gb_trees:lookup(Tab,LLQ) of none -> LLQ; {value, H} -> %% Check if it is time to issue a disc_load request case H#late_load.loaders of [Node] -> Reason = {H#late_load.reason, last_loader_down, Node}, cast({disc_load, Tab, Reason}); % Ugly cast _ -> ignore end, %% Drop the node from the list of loaders H2 = H#late_load{loaders = H#late_load.loaders -- [Node]}, gb_trees:update(Tab, H2, LLQ) end.add_active_replica(Tab, Node) -> add_active_replica(Tab, Node, val({Tab, cstruct})).add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) -> Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs), AccessMode = Cs#cstruct.access_mode, add_active_replica(Tab, Node, Storage, AccessMode).%% Block table primitivesblock_table(Tab) -> Var = {Tab, where_to_commit}, Old = val(Var), New = {blocked, Old}, set(Var, New). % where_to_commitunblock_table(Tab) -> call({unblock_table, Tab}).is_tab_blocked(W2C) when list(W2C) -> {false, W2C};is_tab_blocked({blocked, W2C}) when list(W2C) -> {true, W2C}.mark_blocked_tab(true, Value) -> {blocked, Value};mark_blocked_tab(false, Value) -> Value.%%add_active_replica(Tab, Node, Storage, AccessMode) -> Var = {Tab, where_to_commit}, {Blocked, Old} = is_tab_blocked(val(Var)), Del = lists:keydelete(Node, 1, Old), case AccessMode of read_write -> New = lists:sort([{Node, Storage} | Del]), set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit mnesia_lib:add_lsort({Tab, where_to_write}, Node); read_only -> set(Var, mark_blocked_tab(Blocked, Del)), mnesia_lib:del({Tab, where_to_write}, Node) end, add({Tab, active_replicas}, Node).del_active_replica(Tab, Node) -> Var = {Tab, where_to_commit}, {Blocked, Old} = is_tab_blocked(val(Var)), Del = lists:keydelete(Node, 1, Old), New = lists:sort(Del), set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit mnesia_lib:del({Tab, active_replicas}, Node), mnesia_lib:del({Tab, where_to_write}, Node).change_table_access_mode(Cs) -> W = fun() -> Tab = Cs#cstruct.name,
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?