mnesia_controller.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页

ERL
2,103
字号
	none -> {none, gb_trees:empty()};	{Tab, Worker} -> {Worker, gb_trees:delete(Tab,Queue)}    end.pick_next([Head | Tail], Load, Order) when record(Head, net_load) ->    Tab = Head#net_load.table,    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);pick_next([Head | Tail], Load, Order) when record(Head, disc_load) ->    Tab = Head#disc_load.table,    select_best(Head, Tail, ?catch_val({Tab, load_order}), Load, Order);pick_next([], none, _Order) ->    none;pick_next([], Load, _Order) ->    {element(2,Load), Load}.select_best(_Head, Tail, {'EXIT', _WHAT}, Load, Order) ->    %% Table have been deleted drop it.    pick_next(Tail, Load, Order);select_best(Load, Tail, Order, none, none) ->    pick_next(Tail, Load, Order);select_best(Load, Tail, Order, _OldLoad, OldOrder) when Order > OldOrder ->    pick_next(Tail, Load, Order);select_best(_Load, Tail, _Order, OldLoad, OldOrder) ->    pick_next(Tail, OldLoad, OldOrder).%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, State) ->    mnesia_monitor:terminate_proc(?SERVER_NAME, Reason, State).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Upgrade process when its code is to be changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_OldVsn, State0, _Extra) ->    %% Loader Queue    State1 = case State0#state.loader_pid of		 Pids when is_list(Pids) -> State0;		 undefined -> State0#state{loader_pid = [],loader_queue=gb_trees:empty()};		 Pid when is_pid(Pid) -> 		     [Loader|Rest] = State0#state.loader_queue,		     LQ0 = [{element(2,Rec),Rec} || Rec <- Rest],		     LQ1 = lists:sort(LQ0),		     LQ  = gb_trees:from_orddict(LQ1),		     State0#state{loader_pid=[{Pid,Loader}], loader_queue=LQ}	     end,    %% LateLoaderQueue    State = if is_list(State1#state.late_loader_queue) -> 		    LLQ0 = State1#state.late_loader_queue,		    LLQ1 = lists:sort([{element(2,Rec),Rec} || Rec <- LLQ0]),		    LLQ  = gb_trees:from_orddict(LLQ1),		    State1#state{late_loader_queue=LLQ};	       true ->		    State1	    end,    {ok, State}. %%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------maybe_log_mnesia_down(N) ->    %% We use mnesia_down when deciding which tables to load locally,    %% so if we are not running (i.e haven't decided which tables    %% to load locally), don't log mnesia_down yet.    case mnesia_lib:is_running() of	yes -> 	    verbose("Logging mnesia_down ~w~n", [N]),	    mnesia_recover:log_mnesia_down(N),	    ok;	_ ->	    	    	    Filter = fun(Tab) ->			     inactive_copy_holders(Tab, N)		     end,	    HalfLoadedTabs = lists:any(Filter, val({schema, local_tables}) -- [schema]),	    if 		HalfLoadedTabs == true ->		    verbose("Logging mnesia_down ~w~n", [N]),		    mnesia_recover:log_mnesia_down(N),		    ok;				true ->		    %% Unfortunately we have not loaded some common		    %% tables yet, so we cannot rely on the nodedown		    log_later   %% BUGBUG handle this case!!!	    end    end.inactive_copy_holders(Tab, Node) ->    Cs = val({Tab, cstruct}),    case mnesia_lib:cs_to_storage_type(Node, Cs) of	unknown ->	    false;	_Storage ->	    mnesia_lib:not_active_here(Tab)    end.orphan_tables([Tab | Tabs], Node, Ns, Local, Remote) ->    Cs = val({Tab, cstruct}),    CopyHolders = mnesia_lib:copy_holders(Cs),    RamCopyHolders = Cs#cstruct.ram_copies,    DiscCopyHolders = CopyHolders -- RamCopyHolders,    DiscNodes = val({schema, disc_copies}),    LocalContent = Cs#cstruct.local_content,    RamCopyHoldersOnDiscNodes = mnesia_lib:intersect(RamCopyHolders, DiscNodes),    Active = val({Tab, active_replicas}),    BeingCreated = (?catch_val({Tab, create_table}) == true),    Read = val({Tab, where_to_read}),    case lists:member(Node, DiscCopyHolders) of	_ when BeingCreated == true -> 	    orphan_tables(Tabs, Node, Ns, Local, Remote);	_ when Read == node() -> %% Allready loaded	    orphan_tables(Tabs, Node, Ns, Local, Remote);	true when Active == [] ->	    case DiscCopyHolders -- Ns of		[] ->		    %% We're last up and the other nodes have not		    %% loaded the table. Lets load it if we are		    %% the smallest node.		    case lists:min(DiscCopyHolders) of			Min when Min == node() ->			    case mnesia_recover:get_master_nodes(Tab) of				[] ->				    L = [Tab | Local],				    orphan_tables(Tabs, Node, Ns, L, Remote);				Masters ->				    R = [{Tab, Masters} | Remote],				    orphan_tables(Tabs, Node, Ns, Local, R)			    end;			_ ->			    orphan_tables(Tabs, Node, Ns, Local, Remote)		    end;		_ ->		    orphan_tables(Tabs, Node, Ns, Local, Remote)	    end;	false when Active == [], DiscCopyHolders == [], RamCopyHoldersOnDiscNodes == [] ->	    %% Special case when all replicas resides on disc less nodes	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);	_ when LocalContent == true ->	    orphan_tables(Tabs, Node, Ns, [Tab | Local], Remote);	_ ->	    orphan_tables(Tabs, Node, Ns, Local, Remote)    end;orphan_tables([], _, _, LocalOrphans, RemoteMasters) ->    {LocalOrphans, RemoteMasters}.node_has_tabs([Tab | Tabs], Node, State) when Node /= node() ->    State2 = 	case catch update_whereabouts(Tab, Node, State) of	    State1 = #state{} -> State1;	    {'EXIT', R} ->  %% Tab was just deleted?		case ?catch_val({Tab, cstruct}) of		    {'EXIT', _} -> State; % yes		    _ ->  erlang:fault(R) 		end	end,    node_has_tabs(Tabs, Node, State2);node_has_tabs([Tab | Tabs], Node, State) ->    user_sync_tab(Tab),    node_has_tabs(Tabs, Node, State);node_has_tabs([], _Node, State) ->    State.update_whereabouts(Tab, Node, State) ->    Storage = val({Tab, storage_type}),    Read = val({Tab, where_to_read}),    LocalC = val({Tab, local_content}),    BeingCreated = (?catch_val({Tab, create_table}) == true),    Masters = mnesia_recover:get_master_nodes(Tab),    ByForce = val({Tab, load_by_force}),    GoGetIt =	if	    ByForce == true ->		true;	    Masters == [] ->		true;	    true ->		lists:member(Node, Masters)	end,        dbg_out("Table ~w is loaded on ~w. s=~w, r=~w, lc=~w, f=~w, m=~w~n",	    [Tab, Node, Storage, Read, LocalC, ByForce, GoGetIt]),    if	LocalC == true ->	    %% Local contents, don't care about other node	    State;	BeingCreated == true ->	    	    %% The table is currently being created	    %% It will be handled elsewhere	    State;	Storage == unknown, Read == nowhere ->	    %% No own copy, time to read remotely	    %% if the other node is a good node	    add_active_replica(Tab, Node),	    case GoGetIt of		true ->		    set({Tab, where_to_read}, Node),		    user_sync_tab(Tab),		    State;		false ->		    State	    end;	Storage == unknown ->	    %% No own copy, continue to read remotely	    	    add_active_replica(Tab, Node),	    	    NodeST = mnesia_lib:storage_type_at_node(Node, Tab),	    ReadST = mnesia_lib:storage_type_at_node(Read, Tab),	    if   %% Avoid reading from disc_only_copies		NodeST == disc_only_copies ->		    ignore;		ReadST == disc_only_copies ->		    mnesia_lib:set_remote_where_to_read(Tab);		true ->		    ignore	    end,	    user_sync_tab(Tab),	    State;	Read == nowhere ->	    %% Own copy, go and get a copy of the table	    %% if the other node is master or if there	    %% are no master at all	    add_active_replica(Tab, Node),	    case GoGetIt of		true ->		    Worker = #net_load{table = Tab,				       reason = {active_remote, Node}},		    add_worker(Worker, State);		false ->		    State	    end;	true ->	    %% We already have an own copy	    add_active_replica(Tab, Node),	    user_sync_tab(Tab),	    State    end.initial_safe_loads() ->    case val({schema, storage_type}) of	ram_copies ->	    Downs = [],	    Tabs = val({schema, local_tables}) -- [schema],	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,	    lists:zf(LastC, Tabs);		disc_copies ->	    Downs = mnesia_recover:get_mnesia_downs(),	    dbg_out("mnesia_downs = ~p~n", [Downs]),			    Tabs = val({schema, local_tables}) -- [schema],	    LastC = fun(T) -> last_consistent_replica(T, Downs) end,	    lists:zf(LastC, Tabs)    end.    last_consistent_replica(Tab, Downs) ->    Cs = val({Tab, cstruct}),    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),    Ram = Cs#cstruct.ram_copies,    Disc = Cs#cstruct.disc_copies,    DiscOnly = Cs#cstruct.disc_only_copies,    BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,    BetterCopies = BetterCopies0 -- Ram,    AccessMode = Cs#cstruct.access_mode,    Copies = mnesia_lib:copy_holders(Cs),    Masters = mnesia_recover:get_master_nodes(Tab),    LocalMaster0 = lists:member(node(), Masters),    LocalContent = Cs#cstruct.local_content,    RemoteMaster =	if	    Masters == [] -> false;	    true -> not LocalMaster0	end,    LocalMaster =	if	    Masters == [] -> false;	    true -> LocalMaster0	end,    if	Copies == [node()]  ->	    %% Only one copy holder and it is local.	    %% It may also be a local contents table	    {true, {Tab, local_only}};	LocalContent == true ->	    {true, {Tab, local_content}};	LocalMaster == true ->	    %% We have a local master	    {true, {Tab, local_master}};	RemoteMaster == true ->	    %% Wait for remote master copy	    false;	Storage == ram_copies ->	    if		Disc == [], DiscOnly == [] ->		    %% Nobody has copy on disc		    {true, {Tab, ram_only}};		true ->		    %% Some other node has copy on disc		    false	    end;	AccessMode == read_only ->	    %% No one has been able to update the table,	    %% i.e. all disc resident copies are equal	    {true, {Tab, read_only}};	BetterCopies /= [], Masters /= [node()] ->	    %% There are better copies on other nodes	    %% and we do not have the only master copy	    false;	true ->	    {true, {Tab, initial}}    end.reconfigure_tables(N, [Tab |Tail]) ->    del_active_replica(Tab, N),    case val({Tab, where_to_read}) of	N ->  mnesia_lib:set_remote_where_to_read(Tab);	_ ->  ignore    end,    reconfigure_tables(N, Tail);reconfigure_tables(_, []) ->    ok.remove_loaders([Tab| Tabs], N, Loaders) ->    LateQ = drop_loaders(Tab, N, Loaders),    remove_loaders(Tabs, N, LateQ);remove_loaders([],_, LateQ) -> LateQ.remove_early_messages([], _Node) ->    [];remove_early_messages([{call, {add_active_replica, [_, Node, _, _], _}, _}|R], Node) ->    remove_early_messages(R, Node); %% Does a reply before queuingremove_early_messages([{call, {block_table, _, From}, ReplyTo}|R], Node)   when node(From) == Node ->    reply(ReplyTo, ok),  %% Remove gen:server waits..    remove_early_messages(R, Node);remove_early_messages([{cast, {i_have_tab, _Tab, Node}}|R], Node) ->    remove_early_messages(R, Node);remove_early_messages([{cast, {adopt_orphans, Node, _Tabs}}|R], Node) ->    remove_early_messages(R, Node);remove_early_messages([M|R],Node) ->    [M|remove_early_messages(R,Node)].%% Drop loader from late load queue and possibly trigger a disc_loaddrop_loaders(Tab, Node, LLQ) ->    case gb_trees:lookup(Tab,LLQ) of	none ->	    LLQ;	{value, H} ->	    %% Check if it is time to issue a disc_load request	    case H#late_load.loaders of		[Node] ->		    Reason = {H#late_load.reason, last_loader_down, Node},		    cast({disc_load, Tab, Reason});  % Ugly cast		_ ->		    ignore	    end,	    %% Drop the node from the list of loaders	    H2 = H#late_load{loaders = H#late_load.loaders -- [Node]},	    gb_trees:update(Tab, H2, LLQ)    end.add_active_replica(Tab, Node) ->    add_active_replica(Tab, Node, val({Tab, cstruct})).add_active_replica(Tab, Node, Cs) when record(Cs, cstruct) ->    Storage = mnesia_lib:schema_cs_to_storage_type(Node, Cs),    AccessMode = Cs#cstruct.access_mode,    add_active_replica(Tab, Node, Storage, AccessMode).%% Block table primitivesblock_table(Tab) ->    Var = {Tab, where_to_commit},    Old = val(Var),    New = {blocked, Old},    set(Var, New). % where_to_commitunblock_table(Tab) ->    call({unblock_table, Tab}).is_tab_blocked(W2C) when list(W2C) ->    {false, W2C};is_tab_blocked({blocked, W2C}) when list(W2C) ->    {true, W2C}.mark_blocked_tab(true, Value) ->     {blocked, Value};mark_blocked_tab(false, Value) ->     Value.%%add_active_replica(Tab, Node, Storage, AccessMode) ->    Var = {Tab, where_to_commit},    {Blocked, Old} = is_tab_blocked(val(Var)),    Del = lists:keydelete(Node, 1, Old),    case AccessMode of	read_write ->	    New = lists:sort([{Node, Storage} | Del]),	    set(Var, mark_blocked_tab(Blocked, New)), % where_to_commit	    mnesia_lib:add_lsort({Tab, where_to_write}, Node);	read_only ->	    set(Var, mark_blocked_tab(Blocked, Del)),	    mnesia_lib:del({Tab, where_to_write}, Node)    end,    add({Tab, active_replicas}, Node).del_active_replica(Tab, Node) ->    Var = {Tab, where_to_commit},    {Blocked, Old} = is_tab_blocked(val(Var)),    Del = lists:keydelete(Node, 1, Old),    New = lists:sort(Del),    set(Var, mark_blocked_tab(Blocked, New)),      % where_to_commit    mnesia_lib:del({Tab, active_replicas}, Node),    mnesia_lib:del({Tab, where_to_write}, Node).change_table_access_mode(Cs) ->    W = fun() -> 		Tab = Cs#cstruct.name,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?