mnesia_controller.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 2,103 行 · 第 1/5 页

ERL
2,103
字号
	Access == read_write,	LocalC == false,	StillActive == true,	RemoteIntent == true ->	    Masters = mnesia_recover:get_master_nodes(Tab),	    case lists:member(Node, Masters) of		true ->		    %% The other node is master node for		    %% the table, accept his load intent		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);		false when Masters == [] ->		    %% The table has no master nodes		    %% accept his load intent		    late_load_filter(RemoteLoaders, Tab, Nodes, [Node | Acc]);		false ->		    %% Some one else is master node for		    %% the table, ignore his load intent		    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)	    end;	true ->	    late_load_filter(RemoteLoaders, Tab, Nodes, Acc)    end;late_load_filter([], _Tab, _Nodes, Acc) ->    Acc.    %%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_cast({release_schema_commit_lock, _Owner}, State) ->    if	State#state.is_stopping == true ->	    {stop, shutdown, State};	true -> 	    case State#state.dumper_queue of		[#schema_commit_lock{}|Rest] ->		    [_Worker | Rest] = State#state.dumper_queue,		    State2 = State#state{dumper_pid = undefined,					 dumper_queue = Rest},		    State3 = opt_start_worker(State2),		    noreply(State3);		_ ->		    noreply(State)	    end    end;handle_cast(unblock_controller, State) ->    if	State#state.is_stopping == true ->	    {stop, shutdown, State};	record(hd(State#state.dumper_queue), block_controller) ->	    [_Worker | Rest] = State#state.dumper_queue,	    State2 = State#state{dumper_pid = undefined,				 dumper_queue = Rest},	    State3 = opt_start_worker(State2),	    	    noreply(State3)    end;handle_cast({mnesia_down, Node}, State) ->    maybe_log_mnesia_down(Node),    mnesia_lib:del({current, db_nodes}, Node),    mnesia_checkpoint:tm_mnesia_down(Node),    Alltabs = val({schema, tables}),    reconfigure_tables(Node, Alltabs),    %% Done from (external point of view)    mnesia_monitor:mnesia_down(?SERVER_NAME, Node),    %% Fix if we are late_merging against the node that went down    case State#state.schema_is_merged of	{false, Node} -> 	    spawn(?MODULE, call, [{schema_is_merged, [], late_merge, []}]);	_ ->	    ignore    end,        %% Fix internal stuff    LateQ = remove_loaders(Alltabs, Node, State#state.late_loader_queue),        case get_senders(State) ++ get_loaders(State) of	[] -> ignore;	Senders -> 	    lists:foreach(fun({Pid,_}) -> Pid ! {copier_done, Node} end,			  Senders)    end,    lists:foreach(fun(Pid) -> Pid ! {copier_done,Node} end, 		  State#state.others),        Remove = fun(ST) ->		     node(ST#send_table.receiver_pid) /= Node	     end,    NewSenders = lists:filter(Remove, State#state.sender_queue),    Early = remove_early_messages(State#state.early_msgs, Node),    noreply(State#state{sender_queue = NewSenders, 			early_msgs = Early, 			late_loader_queue = LateQ		       });handle_cast({merging_schema, Node}, State) ->    case State#state.schema_is_merged of	false ->	    %% This comes from dynamic connect_nodes which are made	    %% after mnesia:start() and the schema_merge.	    ImANewKidInTheBlock = 		(val({schema, storage_type}) == ram_copies) 		andalso (mnesia_lib:val({schema, local_tables}) == [schema]),	    case ImANewKidInTheBlock of		true ->  %% I'm newly started ram_node..		    noreply(State#state{schema_is_merged = {false, Node}});		false ->		    noreply(State)	    end;	_ -> %% Already merging schema.	    noreply(State)    end;handle_cast(Msg, State) when State#state.schema_is_merged /= true ->    %% Buffer early messages    Msgs = State#state.early_msgs,    noreply(State#state{early_msgs = [{cast, Msg} | Msgs]});%% This must be done after schema_is_merged otherwise adopt_orphan%% might trigger a table load from wrong nodes as a result of that we don't %% know which tables we can load safly first.handle_cast({im_running, _Node, NewFriends}, State) ->    Tabs = mnesia_lib:local_active_tables() -- [schema],    Ns = mnesia_lib:intersect(NewFriends, val({current, db_nodes})),    abcast(Ns, {adopt_orphans, node(), Tabs}),    noreply(State);handle_cast({disc_load, Tab, Reason}, State) ->    Worker = #disc_load{table = Tab, reason = Reason},    State2 = add_worker(Worker, State),    noreply(State2);handle_cast(Worker, State) when record(Worker, send_table) ->    State2 = add_worker(Worker, State),    noreply(State2);handle_cast({sync_tabs, Tabs, From}, State) ->    %% user initiated wait_for_tables    handle_sync_tabs(Tabs, From),    noreply(State);handle_cast({i_have_tab, Tab, Node}, State) ->    case lists:member(Node, val({current, db_nodes})) of	true -> 	    State2 = node_has_tabs([Tab], Node, State),	    noreply(State2);	false ->	    noreply(State)    end;handle_cast({force_load_updated, Tab}, State) ->    case val({Tab, active_replicas}) of	[] ->	    %% No valid replicas	    noreply(State);	[SomeNode | _] ->	    State2 = node_has_tabs([Tab], SomeNode, State),	    noreply(State2)    end;    handle_cast({master_nodes_updated, Tab, Masters}, State) ->    Active = val({Tab, active_replicas}),    Valid = 	case val({Tab, load_by_force}) of	    true ->		Active;	    false ->		if		    Masters == [] ->			Active;		    true ->			mnesia_lib:intersect(Masters, Active)		end	end,    case Valid of	[] ->	    %% No valid replicas	    noreply(State);	[SomeNode | _] ->	    State2 = node_has_tabs([Tab], SomeNode, State),	    noreply(State2)    end;    handle_cast({adopt_orphans, Node, Tabs}, State) ->    State2 = node_has_tabs(Tabs, Node, State),        %% Register the other node as up and running    mnesia_recover:log_mnesia_up(Node),    verbose("Logging mnesia_up ~w~n",[Node]),    mnesia_lib:report_system_event({mnesia_up, Node}),        %% Load orphan tables    LocalTabs = val({schema, local_tables}) -- [schema],    Nodes = val({current, db_nodes}),    {LocalOrphans, RemoteMasters} =	orphan_tables(LocalTabs, Node, Nodes, [], []),    Reason = {adopt_orphan, node()},    mnesia_late_loader:async_late_disc_load(node(), LocalOrphans, Reason),        Fun =	fun(N) ->		RemoteOrphans =		    [Tab || {Tab, Ns} <- RemoteMasters,			    lists:member(N, Ns)],		mnesia_late_loader:maybe_async_late_disc_load(N, RemoteOrphans, Reason)	end,    lists:foreach(Fun, Nodes),    noreply(State2);handle_cast(Msg, State) ->    error("~p got unexpected cast: ~p~n", [?SERVER_NAME, Msg]),    noreply(State).handle_sync_tabs([Tab | Tabs], From) ->        case val({Tab, where_to_read}) of	nowhere ->	    case get({sync_tab, Tab}) of		undefined ->		    put({sync_tab, Tab}, [From]);		Pids ->		    put({sync_tab, Tab}, [From | Pids])	    end;	_ ->	    sync_reply(From, Tab)    end,    handle_sync_tabs(Tabs, From);handle_sync_tabs([], _From) ->    ok.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_info({async_dump_log, InitBy}, State) ->    Worker = #dump_log{initiated_by = InitBy},    State2 = add_worker(Worker, State),    noreply(State2);handle_info(Done, State) when record(Done, dumper_done) ->    Pid = Done#dumper_done.worker_pid,    Res = Done#dumper_done.worker_res,    if	State#state.is_stopping == true ->	    {stop, shutdown, State};	Res == dumped, Pid == State#state.dumper_pid ->	    [Worker | Rest] = State#state.dumper_queue,	    reply(Worker#dump_log.opt_reply_to, Res),	    State2 = State#state{dumper_pid = undefined,				 dumper_queue = Rest},	    State3 = opt_start_worker(State2),	    noreply(State3);	true ->	    fatal("Dumper failed: ~p~n state: ~p~n", [Res, State]),	    {stop, fatal, State}    end;handle_info(Done, State0) when record(Done, loader_done) ->        WPid = Done#loader_done.worker_pid,    LateQueue0 = State0#state.late_loader_queue,    Tab = Done#loader_done.table_name,    State1 = State0#state{loader_pid = lists:keydelete(WPid,1,get_loaders(State0))},    State2 =	case Done#loader_done.is_loaded of	    true ->		%% Optional table announcement		if 		    Done#loader_done.needs_announce == true,		    Done#loader_done.needs_reply == true ->			i_have_tab(Tab),			%% Should be {dumper,add_table_copy} only			reply(Done#loader_done.reply_to,			      Done#loader_done.reply);		    Done#loader_done.needs_reply == true ->			%% Should be {dumper,add_table_copy} only			reply(Done#loader_done.reply_to,			      Done#loader_done.reply);		    Done#loader_done.needs_announce == true, Tab == schema ->			i_have_tab(Tab);		    Done#loader_done.needs_announce == true ->			i_have_tab(Tab),			%% Local node needs to perform user_sync_tab/1			Ns = val({current, db_nodes}),			abcast(Ns, {i_have_tab, Tab, node()});		    Tab == schema ->			ignore;		    true ->			%% Local node needs to perform user_sync_tab/1			Ns = val({current, db_nodes}),			AlreadyKnows = val({Tab, active_replicas}),			abcast(Ns -- AlreadyKnows, {i_have_tab, Tab, node()})		end,		%% Optional user sync		case Done#loader_done.needs_sync of		    true -> user_sync_tab(Tab);		    false -> ignore		end,		State1#state{late_loader_queue=gb_trees:delete_any(Tab, LateQueue0)};	    false ->		%% Either the node went down or table was not		%% loaded remotly yet 		case Done#loader_done.needs_reply of		    true ->			reply(Done#loader_done.reply_to,			      Done#loader_done.reply);		    false ->			ignore		end,		case ?catch_val({Tab, active_replicas}) of		    [_|_] -> % still available elsewhere			{value,{_,Worker}} = lists:keysearch(WPid,1,get_loaders(State0)),			add_loader(Tab,Worker,State1);		    _ ->			State1		end	end,    State3 = opt_start_worker(State2),    noreply(State3);handle_info(Done, State) when record(Done, sender_done) ->    Pid = Done#sender_done.worker_pid,    Res = Done#sender_done.worker_res,    Senders = get_senders(State),    {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),    if	Res == ok ->	    	    State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},	    State3 = opt_start_worker(State2),	    noreply(State3);	true ->	    %% No need to send any message to the table receiver	    %% since it will soon get a mnesia_down anyway	    fatal("Sender failed: ~p~n state: ~p~n", [Res, State]),	    {stop, fatal, State}    end;handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->    catch set(mnesia_status, stopping),    case State#state.dumper_pid of	undefined ->	    dbg_out("~p was ~p~n", [?SERVER_NAME, R]),	    {stop, shutdown, State};	_ ->	    noreply(State#state{is_stopping = true})    end;handle_info({'EXIT', Pid, R}, State) when Pid == State#state.dumper_pid ->    case State#state.dumper_queue of	[#schema_commit_lock{}|Workers] -> %% Schema trans crashed or was killed	    dbg_out("WARNING: Dumper ~p exited ~p~n", [Pid, R]),	    State2 = State#state{dumper_queue = Workers, dumper_pid = undefined},	    State3 = opt_start_worker(State2),	    noreply(State3);	_Other ->	    fatal("Dumper or schema commit crashed: ~p~n state: ~p~n", [R, State]),	    {stop, fatal, State}    end;handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout ->    case lists:keymember(Pid, 1, get_senders(State)) of	true ->	    %% No need to send any message to the table receiver	    %% since it will soon get a mnesia_down anyway	    fatal("Sender crashed: ~p~n state: ~p~n", [{Pid,R}, State]),	    {stop, fatal, State};	false ->	    case lists:keymember(Pid, 1, get_loaders(State)) of		true -> 		    fatal("Loader crashed: ~p~n state: ~p~n", [R, State]),		    {stop, fatal, State};		false ->		    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),		    noreply(State)	    end    end;handle_info({From, get_state}, State) ->    From ! {?SERVER_NAME, State},    noreply(State);%% No real need for bufferinghandle_info(Msg, State) when State#state.schema_is_merged /= true ->    %% Buffer early messages    Msgs = State#state.early_msgs,    noreply(State#state{early_msgs = [{info, Msg} | Msgs]});handle_info({'EXIT', Pid, wait_for_tables_timeout}, State) ->    sync_tab_timeout(Pid, get()),    noreply(State);handle_info(Msg, State) ->    error("~p got unexpected info: ~p~n", [?SERVER_NAME, Msg]),    noreply(State).sync_tab_timeout(Pid, [{{sync_tab, Tab}, Pids} | Tail]) ->    case lists:delete(Pid, Pids) of	[] ->	    erase({sync_tab, Tab});	Pids2 ->	    put({sync_tab, Tab}, Pids2)    end,    sync_tab_timeout(Pid, Tail);sync_tab_timeout(Pid, [_ | Tail]) ->    sync_tab_timeout(Pid, Tail);sync_tab_timeout(_Pid, []) ->    ok.%% Pick the load record that has the highest load order%% Returns {BestLoad, RemainingQueue} or {none, []} if queue is emptypick_next(Queue) ->    List = gb_trees:values(Queue),    case pick_next(List, none, none) of

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?