mnesia_controller.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页

ERL
2,103
字号
		lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end,			      val({Tab, active_replicas}))	end,    update(W).	    %% node To now has tab loaded, but this must be undone%% This code is rpc:call'ed from the tab_copier process%% when it has *not* released it's table lockunannounce_add_table_copy(Tab, To) ->    catch del_active_replica(Tab, To),    case catch val({Tab , where_to_read}) of	To -> 	    mnesia_lib:set_remote_where_to_read(Tab);	_ ->	    ignore    end.user_sync_tab(Tab) ->    case val(debug) of	trace ->	    mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab});	_ ->	    ignore    end,	    case erase({sync_tab, Tab}) of	undefined ->	    ok;	Pids ->	    lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids)    end.i_have_tab(Tab) ->    case val({Tab, local_content}) of	true ->	    mnesia_lib:set_local_content_whereabouts(Tab);	false ->	    set({Tab, where_to_read}, node())    end,    add_active_replica(Tab, node()).sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema ->    Current = val({current, db_nodes}),    Ns = 	case lists:member(ToNode, Current) of	    true -> Current -- [ToNode];	    false -> Current	end,       remote_call(ToNode, block_table, [Tab]),    [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) ||	Node <- [ToNode | Ns]],    ok.sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema ->    Current = val({current, db_nodes}),    Ns =	case lists:member(ToNode, Current) of	    true -> Current;	    false -> [ToNode | Current]	end,    Args = [Tab, ToNode],    [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns],    ok.get_info(Timeout) ->    case whereis(?SERVER_NAME) of	undefined ->	    {timeout, Timeout};	Pid ->	    Pid ! {self(), get_state},	    receive		{?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} ->		    {info,State#state{loader_queue=gb_trees:to_list(LQ),				      late_loader_queue=gb_trees:to_list(LLQ)}}	    after Timeout ->		    {timeout, Timeout}	    end    end.get_workers(Timeout) ->    case whereis(?SERVER_NAME) of	undefined ->	    {timeout, Timeout};	Pid ->	    Pid ! {self(), get_state},	    receive		{?SERVER_NAME, State} when record(State, state) ->		    {workers, get_loaders(State), get_senders(State), State#state.dumper_pid}	    after Timeout ->		    {timeout, Timeout}	    end    end.    info() ->    Tabs = mnesia_lib:local_active_tables(),    io:format( "---> Active tables <--- ~n", []),    info(Tabs).info([Tab | Tail]) ->    case val({Tab, storage_type}) of	disc_only_copies ->	    info_format(Tab, 			dets:info(Tab, size), 			dets:info(Tab, file_size),			"bytes on disc");	_ ->	    info_format(Tab, 			?ets_info(Tab, size),			?ets_info(Tab, memory),			"words of mem")    end,    info(Tail);info([]) -> ok.info_format(Tab, Size, Mem, Media) ->    StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []),    StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []),    StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []),    io:format("~s: with ~s records occupying ~s ~s~n",	      [StrT, StrS, StrM, Media]).%% Handle early arrived messageshandle_early_msgs([Msg | Msgs], State) ->    %% The messages are in reverse order    case handle_early_msg(Msg, State) of%%         {stop, Reason, Reply, State2} ->  % Will not happen according to dialyzer%% 	    {stop, Reason, Reply, State2};        {stop, Reason, State2} ->	    {stop, Reason, State2};	{noreply, State2} ->	    handle_early_msgs(Msgs, State2); 	{reply, Reply, State2} ->	    {call, _Call, From} = Msg,	    reply(From, Reply), 	    handle_early_msgs(Msgs, State2)    end;handle_early_msgs([], State) ->    noreply(State).handle_early_msg({call, Msg, From}, State) ->    handle_call(Msg, From, State);handle_early_msg({cast, Msg}, State) ->    handle_cast(Msg, State);handle_early_msg({info, Msg}, State) ->    handle_info(Msg, State).    noreply(State) ->    {noreply, State}.reply(undefined, Reply) ->    Reply;reply(ReplyTo, Reply) ->    gen_server:reply(ReplyTo, Reply),    Reply.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Worker management%% Returns new Stateadd_worker(Worker, State) when record(Worker, dump_log) ->    InitBy = Worker#dump_log.initiated_by,    Queue = State#state.dumper_queue,    case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of	false ->	    ignore;	true when Worker#dump_log.opt_reply_to == undefined ->	    %% The same threshold has been exceeded again,	    %% before we have had the possibility to	    %% process the older one.	    DetectedBy = {dump_log, InitBy},	    Event = {mnesia_overload, DetectedBy},	    mnesia_lib:report_system_event(Event)    end,    Queue2 = Queue ++ [Worker],    State2 = State#state{dumper_queue = Queue2},    opt_start_worker(State2);add_worker(Worker, State) when record(Worker, schema_commit_lock) ->    Queue = State#state.dumper_queue,    Queue2 = Queue ++ [Worker],    State2 = State#state{dumper_queue = Queue2},    opt_start_worker(State2);add_worker(Worker, State) when record(Worker, net_load) ->    opt_start_worker(add_loader(Worker#net_load.table,Worker,State));add_worker(Worker, State) when record(Worker, send_table) ->    Queue = State#state.sender_queue,    State2 = State#state{sender_queue = Queue ++ [Worker]},    opt_start_worker(State2);add_worker(Worker, State) when record(Worker, disc_load) ->    opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));% Block controller should be used for upgrading mnesia.add_worker(Worker, State) when record(Worker, block_controller) ->     Queue = State#state.dumper_queue,    Queue2 = [Worker | Queue],    State2 = State#state{dumper_queue = Queue2},    opt_start_worker(State2).add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) ->    case gb_trees:is_defined(Tab, LQ0) of	true -> State;	false -> 	    LQ=gb_trees:insert(Tab, Worker, LQ0),	    State#state{loader_queue=LQ}    end.%% Optionally start a worker%% %% Dumpers and loaders may run simultaneously%% but neither of them may run during schema commit.%% Loaders may not start if a schema commit is enqueued.opt_start_worker(State) when State#state.is_stopping == true ->    State;opt_start_worker(State) ->    %% Prioritize dumper and schema commit    %% by checking them first    case State#state.dumper_queue of	[Worker | _Rest] when State#state.dumper_pid == undefined ->	    %% Great, a worker in queue and neither	    %% a schema transaction is being	    %% committed and nor a dumper is running		    	    %% Start worker but keep him in the queue	    if		record(Worker, schema_commit_lock) ->		    ReplyTo = Worker#schema_commit_lock.owner,		    reply(ReplyTo, granted),		    {Owner, _Tag} = ReplyTo,		    opt_start_loader(State#state{dumper_pid = Owner});				record(Worker, dump_log) ->		    Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]),		    State2 = State#state{dumper_pid = Pid},		    %% If the worker was a dumper we may		    %% possibly be able to start a loader		    %% or sender		    State3 = opt_start_sender(State2),		    opt_start_loader(State3);				record(Worker, block_controller) ->		    case {get_senders(State), get_loaders(State)} of			{[], []} ->			    ReplyTo = Worker#block_controller.owner,			    reply(ReplyTo, granted),			    {Owner, _Tag} = ReplyTo,			    State#state{dumper_pid = Owner};			_ ->			    State		    end	    end;	_ ->	    %% Bad luck, try with a loader or sender instead 	    State2 = opt_start_sender(State),	    opt_start_loader(State2)    end.opt_start_sender(State) ->    case State#state.sender_queue of	[]->   State; 	    %% No need	SenderQ -> 	    {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), 					    [], get_loaders(State)),	    State#state{sender_pid = NewS, sender_queue = Kept}    end.opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) ->    Tab = Sender#send_table.table,    Active = val({Tab, active_replicas}),    IgotIt = lists:member(node(), Active),        IsLoading = lists:any(fun({_Pid,Loader}) -> 				  Tab == element(#net_load.table, Loader)			  end, LoaderQ),    if 	IgotIt, IsLoading  ->	    %% I'm currently finishing loading the table let him wait	    opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ);	IgotIt ->	    %% Start worker but keep him in the queue	    Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]),	    opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ);	true ->	    verbose("Send table failed ~p not active on this node ~n", [Tab]),	    Sender#send_table.receiver_pid ! {copier_done, node()},	    opt_start_sender2(R,Pids, Kept, LoaderQ)    end.opt_start_loader(State = #state{loader_queue = LoaderQ}) ->    Current = get_loaders(State),    Max = max_loaders(),    case gb_trees:is_empty(LoaderQ) of	true -> 	    State;	_ when length(Current) >= Max -> 	    State;	false -> 	    SchemaQueue = State#state.dumper_queue,	    case lists:keymember(schema_commit_lock, 1, SchemaQueue) of		false ->		    case pick_next(LoaderQ) of			{none,Rest} ->			    State#state{loader_queue=Rest};			{Worker,Rest} ->			    case already_loading(Worker, get_loaders(State)) of				true ->				    opt_start_loader(State#state{loader_queue = Rest});				false ->				    %% Start worker but keep him in the queue				    Pid = load_and_reply(self(), Worker),				    State#state{loader_pid=[{Pid,Worker}|get_loaders(State)],						loader_queue = Rest}			    end		    end;		true ->		    %% Bad luck, we must wait for the schema commit		    State	    end    end.already_loading(#net_load{table=Tab},Loaders) ->    already_loading2(Tab,Loaders);already_loading(#disc_load{table=Tab},Loaders) ->    already_loading2(Tab,Loaders).already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);     already_loading2(_,[]) -> false.start_remote_sender(Node, Tab, Receiver, Storage) ->    Msg = #send_table{table = Tab,		      receiver_pid = Receiver,		      remote_storage = Storage},    gen_server:cast({?SERVER_NAME, Node}, Msg).dump_and_reply(ReplyTo, Worker) ->    %% No trap_exit, die intentionally instead    Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by),    ReplyTo ! #dumper_done{worker_pid = self(),			   worker_res = Res},    unlink(ReplyTo),    exit(normal).send_and_reply(ReplyTo, Worker) ->    %% No trap_exit, die intentionally instead    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,				   Worker#send_table.table,				   Worker#send_table.remote_storage),    ReplyTo ! #sender_done{worker_pid = self(),			   worker_res = Res},    unlink(ReplyTo),    exit(normal).load_and_reply(ReplyTo, Worker) ->    Load = load_table_fun(Worker),    SendAndReply = 	fun() -> 		process_flag(trap_exit, true),		Done = Load(),		ReplyTo ! Done#loader_done{worker_pid = self()},		unlink(ReplyTo),		exit(normal)	end,    spawn_link(SendAndReply).%% Now it is time to load the table%% but first we must check if it still is neccessaryload_table_fun(Load) when record(Load, net_load) ->    Tab = Load#net_load.table,    ReplyTo = Load#net_load.opt_reply_to,    Reason =  Load#net_load.reason,    LocalC = val({Tab, local_content}),    AccessMode = val({Tab, access_mode}),    ReadNode = val({Tab, where_to_read}),    Active = filter_active(Tab),    Done = #loader_done{is_loaded = true,			table_name = Tab,			needs_announce = false,			needs_sync = false,			needs_reply = (ReplyTo /= undefined),			reply_to = ReplyTo,			reply = {loaded, ok}		       },    if	ReadNode == node() ->	    %% Already loaded locally	    fun() -> Done end;	LocalC == true ->	    fun() ->		    Res = mnesia_loader:disc_load_table(Tab, load_local_content),		    Done#loader_done{reply = Res, needs_announce = true, needs_sync = true}	    end;	AccessMode == read_only, Reason /= {dumper,add_table_copy} ->	    fun() -> disc_load_table(Tab, Reason, ReplyTo) end;	true ->	    fun() ->		    %% Either we cannot read the table yet		    %% or someone is moving a replica between		    %% two nodes		    Cs =  Load#net_load.cstruct,		    Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs),		    case Res of			{loaded, ok} ->			    Done#loader_done{needs_sync = true,					     reply = Res};			{not_loaded, _} ->			    Done#loader_done{is_loaded = false,					     reply = Res}		    end	    end    end;load_table_fun(Load) when record(Load, disc_load) ->    Tab = Load#disc_load.table,    Reason =  Load#disc_load.reason,    ReplyTo = Load#disc_load.opt_reply_to,    ReadNode = val({Tab, where_to_read}),    Active = filter_active(Tab),    Don

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?