mnesia_controller.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页
ERL
2,103 行
lists:foreach(fun(N) -> add_active_replica(Tab, N, Cs) end, val({Tab, active_replicas})) end, update(W). %% node To now has tab loaded, but this must be undone%% This code is rpc:call'ed from the tab_copier process%% when it has *not* released it's table lockunannounce_add_table_copy(Tab, To) -> catch del_active_replica(Tab, To), case catch val({Tab , where_to_read}) of To -> mnesia_lib:set_remote_where_to_read(Tab); _ -> ignore end.user_sync_tab(Tab) -> case val(debug) of trace -> mnesia_subscr:subscribe(whereis(mnesia_event), {table, Tab}); _ -> ignore end, case erase({sync_tab, Tab}) of undefined -> ok; Pids -> lists:foreach(fun(Pid) -> sync_reply(Pid, Tab) end, Pids) end.i_have_tab(Tab) -> case val({Tab, local_content}) of true -> mnesia_lib:set_local_content_whereabouts(Tab); false -> set({Tab, where_to_read}, node()) end, add_active_replica(Tab, node()).sync_and_block_table_whereabouts(Tab, ToNode, RemoteS, AccessMode) when Tab /= schema -> Current = val({current, db_nodes}), Ns = case lists:member(ToNode, Current) of true -> Current -- [ToNode]; false -> Current end, remote_call(ToNode, block_table, [Tab]), [remote_call(Node, add_active_replica, [Tab, ToNode, RemoteS, AccessMode]) || Node <- [ToNode | Ns]], ok.sync_del_table_copy_whereabouts(Tab, ToNode) when Tab /= schema -> Current = val({current, db_nodes}), Ns = case lists:member(ToNode, Current) of true -> Current; false -> [ToNode | Current] end, Args = [Tab, ToNode], [remote_call(Node, unannounce_add_table_copy, Args) || Node <- Ns], ok.get_info(Timeout) -> case whereis(?SERVER_NAME) of undefined -> {timeout, Timeout}; Pid -> Pid ! {self(), get_state}, receive {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} -> {info,State#state{loader_queue=gb_trees:to_list(LQ), late_loader_queue=gb_trees:to_list(LLQ)}} after Timeout -> {timeout, Timeout} end end.get_workers(Timeout) -> case whereis(?SERVER_NAME) of undefined -> {timeout, Timeout}; Pid -> Pid ! {self(), get_state}, receive {?SERVER_NAME, State} when record(State, state) -> {workers, get_loaders(State), get_senders(State), State#state.dumper_pid} after Timeout -> {timeout, Timeout} end end. info() -> Tabs = mnesia_lib:local_active_tables(), io:format( "---> Active tables <--- ~n", []), info(Tabs).info([Tab | Tail]) -> case val({Tab, storage_type}) of disc_only_copies -> info_format(Tab, dets:info(Tab, size), dets:info(Tab, file_size), "bytes on disc"); _ -> info_format(Tab, ?ets_info(Tab, size), ?ets_info(Tab, memory), "words of mem") end, info(Tail);info([]) -> ok.info_format(Tab, Size, Mem, Media) -> StrT = mnesia_lib:pad_name(atom_to_list(Tab), 15, []), StrS = mnesia_lib:pad_name(integer_to_list(Size), 8, []), StrM = mnesia_lib:pad_name(integer_to_list(Mem), 8, []), io:format("~s: with ~s records occupying ~s ~s~n", [StrT, StrS, StrM, Media]).%% Handle early arrived messageshandle_early_msgs([Msg | Msgs], State) -> %% The messages are in reverse order case handle_early_msg(Msg, State) of%% {stop, Reason, Reply, State2} -> % Will not happen according to dialyzer%% {stop, Reason, Reply, State2}; {stop, Reason, State2} -> {stop, Reason, State2}; {noreply, State2} -> handle_early_msgs(Msgs, State2); {reply, Reply, State2} -> {call, _Call, From} = Msg, reply(From, Reply), handle_early_msgs(Msgs, State2) end;handle_early_msgs([], State) -> noreply(State).handle_early_msg({call, Msg, From}, State) -> handle_call(Msg, From, State);handle_early_msg({cast, Msg}, State) -> handle_cast(Msg, State);handle_early_msg({info, Msg}, State) -> handle_info(Msg, State). noreply(State) -> {noreply, State}.reply(undefined, Reply) -> Reply;reply(ReplyTo, Reply) -> gen_server:reply(ReplyTo, Reply), Reply.%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Worker management%% Returns new Stateadd_worker(Worker, State) when record(Worker, dump_log) -> InitBy = Worker#dump_log.initiated_by, Queue = State#state.dumper_queue, case lists:keymember(InitBy, #dump_log.initiated_by, Queue) of false -> ignore; true when Worker#dump_log.opt_reply_to == undefined -> %% The same threshold has been exceeded again, %% before we have had the possibility to %% process the older one. DetectedBy = {dump_log, InitBy}, Event = {mnesia_overload, DetectedBy}, mnesia_lib:report_system_event(Event) end, Queue2 = Queue ++ [Worker], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2);add_worker(Worker, State) when record(Worker, schema_commit_lock) -> Queue = State#state.dumper_queue, Queue2 = Queue ++ [Worker], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2);add_worker(Worker, State) when record(Worker, net_load) -> opt_start_worker(add_loader(Worker#net_load.table,Worker,State));add_worker(Worker, State) when record(Worker, send_table) -> Queue = State#state.sender_queue, State2 = State#state{sender_queue = Queue ++ [Worker]}, opt_start_worker(State2);add_worker(Worker, State) when record(Worker, disc_load) -> opt_start_worker(add_loader(Worker#disc_load.table,Worker,State));% Block controller should be used for upgrading mnesia.add_worker(Worker, State) when record(Worker, block_controller) -> Queue = State#state.dumper_queue, Queue2 = [Worker | Queue], State2 = State#state{dumper_queue = Queue2}, opt_start_worker(State2).add_loader(Tab,Worker,State = #state{loader_queue=LQ0}) -> case gb_trees:is_defined(Tab, LQ0) of true -> State; false -> LQ=gb_trees:insert(Tab, Worker, LQ0), State#state{loader_queue=LQ} end.%% Optionally start a worker%% %% Dumpers and loaders may run simultaneously%% but neither of them may run during schema commit.%% Loaders may not start if a schema commit is enqueued.opt_start_worker(State) when State#state.is_stopping == true -> State;opt_start_worker(State) -> %% Prioritize dumper and schema commit %% by checking them first case State#state.dumper_queue of [Worker | _Rest] when State#state.dumper_pid == undefined -> %% Great, a worker in queue and neither %% a schema transaction is being %% committed and nor a dumper is running %% Start worker but keep him in the queue if record(Worker, schema_commit_lock) -> ReplyTo = Worker#schema_commit_lock.owner, reply(ReplyTo, granted), {Owner, _Tag} = ReplyTo, opt_start_loader(State#state{dumper_pid = Owner}); record(Worker, dump_log) -> Pid = spawn_link(?MODULE, dump_and_reply, [self(), Worker]), State2 = State#state{dumper_pid = Pid}, %% If the worker was a dumper we may %% possibly be able to start a loader %% or sender State3 = opt_start_sender(State2), opt_start_loader(State3); record(Worker, block_controller) -> case {get_senders(State), get_loaders(State)} of {[], []} -> ReplyTo = Worker#block_controller.owner, reply(ReplyTo, granted), {Owner, _Tag} = ReplyTo, State#state{dumper_pid = Owner}; _ -> State end end; _ -> %% Bad luck, try with a loader or sender instead State2 = opt_start_sender(State), opt_start_loader(State2) end.opt_start_sender(State) -> case State#state.sender_queue of []-> State; %% No need SenderQ -> {NewS,Kept} = opt_start_sender2(SenderQ, get_senders(State), [], get_loaders(State)), State#state{sender_pid = NewS, sender_queue = Kept} end.opt_start_sender2([], Pids,Kept, _) -> {Pids,Kept};opt_start_sender2([Sender|R], Pids, Kept, LoaderQ) -> Tab = Sender#send_table.table, Active = val({Tab, active_replicas}), IgotIt = lists:member(node(), Active), IsLoading = lists:any(fun({_Pid,Loader}) -> Tab == element(#net_load.table, Loader) end, LoaderQ), if IgotIt, IsLoading -> %% I'm currently finishing loading the table let him wait opt_start_sender2(R,Pids, [Sender|Kept], LoaderQ); IgotIt -> %% Start worker but keep him in the queue Pid = spawn_link(?MODULE, send_and_reply,[self(), Sender]), opt_start_sender2(R,[{Pid,Sender}|Pids],Kept,LoaderQ); true -> verbose("Send table failed ~p not active on this node ~n", [Tab]), Sender#send_table.receiver_pid ! {copier_done, node()}, opt_start_sender2(R,Pids, Kept, LoaderQ) end.opt_start_loader(State = #state{loader_queue = LoaderQ}) -> Current = get_loaders(State), Max = max_loaders(), case gb_trees:is_empty(LoaderQ) of true -> State; _ when length(Current) >= Max -> State; false -> SchemaQueue = State#state.dumper_queue, case lists:keymember(schema_commit_lock, 1, SchemaQueue) of false -> case pick_next(LoaderQ) of {none,Rest} -> State#state{loader_queue=Rest}; {Worker,Rest} -> case already_loading(Worker, get_loaders(State)) of true -> opt_start_loader(State#state{loader_queue = Rest}); false -> %% Start worker but keep him in the queue Pid = load_and_reply(self(), Worker), State#state{loader_pid=[{Pid,Worker}|get_loaders(State)], loader_queue = Rest} end end; true -> %% Bad luck, we must wait for the schema commit State end end.already_loading(#net_load{table=Tab},Loaders) -> already_loading2(Tab,Loaders);already_loading(#disc_load{table=Tab},Loaders) -> already_loading2(Tab,Loaders).already_loading2(Tab, [{_,#net_load{table=Tab}}|_]) -> true;already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); already_loading2(_,[]) -> false.start_remote_sender(Node, Tab, Receiver, Storage) -> Msg = #send_table{table = Tab, receiver_pid = Receiver, remote_storage = Storage}, gen_server:cast({?SERVER_NAME, Node}, Msg).dump_and_reply(ReplyTo, Worker) -> %% No trap_exit, die intentionally instead Res = mnesia_dumper:opt_dump_log(Worker#dump_log.initiated_by), ReplyTo ! #dumper_done{worker_pid = self(), worker_res = Res}, unlink(ReplyTo), exit(normal).send_and_reply(ReplyTo, Worker) -> %% No trap_exit, die intentionally instead Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, Worker#send_table.table, Worker#send_table.remote_storage), ReplyTo ! #sender_done{worker_pid = self(), worker_res = Res}, unlink(ReplyTo), exit(normal).load_and_reply(ReplyTo, Worker) -> Load = load_table_fun(Worker), SendAndReply = fun() -> process_flag(trap_exit, true), Done = Load(), ReplyTo ! Done#loader_done{worker_pid = self()}, unlink(ReplyTo), exit(normal) end, spawn_link(SendAndReply).%% Now it is time to load the table%% but first we must check if it still is neccessaryload_table_fun(Load) when record(Load, net_load) -> Tab = Load#net_load.table, ReplyTo = Load#net_load.opt_reply_to, Reason = Load#net_load.reason, LocalC = val({Tab, local_content}), AccessMode = val({Tab, access_mode}), ReadNode = val({Tab, where_to_read}), Active = filter_active(Tab), Done = #loader_done{is_loaded = true, table_name = Tab, needs_announce = false, needs_sync = false, needs_reply = (ReplyTo /= undefined), reply_to = ReplyTo, reply = {loaded, ok} }, if ReadNode == node() -> %% Already loaded locally fun() -> Done end; LocalC == true -> fun() -> Res = mnesia_loader:disc_load_table(Tab, load_local_content), Done#loader_done{reply = Res, needs_announce = true, needs_sync = true} end; AccessMode == read_only, Reason /= {dumper,add_table_copy} -> fun() -> disc_load_table(Tab, Reason, ReplyTo) end; true -> fun() -> %% Either we cannot read the table yet %% or someone is moving a replica between %% two nodes Cs = Load#net_load.cstruct, Res = mnesia_loader:net_load_table(Tab, Reason, Active, Cs), case Res of {loaded, ok} -> Done#loader_done{needs_sync = true, reply = Res}; {not_loaded, _} -> Done#loader_done{is_loaded = false, reply = Res} end end end;load_table_fun(Load) when record(Load, disc_load) -> Tab = Load#disc_load.table, Reason = Load#disc_load.reason, ReplyTo = Load#disc_load.opt_reply_to, ReadNode = val({Tab, where_to_read}), Active = filter_active(Tab), Don
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?