mnesia_monitor.erl
字号:
%% in this release we should be able to handle the previous %% protocol case hd(Protocols) of ?previous_protocol_version -> accept_protocol(Mon, MyVersion, ?previous_protocol_version, From, State); _ -> verbose("Connection with ~p rejected. " "version = ~p, protocols = ~p, " "expected version = ~p, expected protocol = ~p~n", [node(Mon), Version, Protocols, MyVersion, Protocol]), {reply, {node(), {reject, self(), MyVersion, Protocol}}, State} end end;%% Local request to negotiate with other monitors (nodes).handle_call({negotiate_protocol, Nodes}, From, State) -> case mnesia_lib:intersect(State#state.going_down, Nodes) of [] -> spawn_link(?MODULE, negotiate_protocol_impl, [Nodes, From]), {noreply, State#state{connecting={From,Nodes}}}; _ -> %% Cannot connect now, still processing mnesia down {reply, busy, State} end;handle_call(init, _From, State) -> net_kernel:monitor_nodes(true), EarlyNodes = State#state.early_connects, State2 = State#state{tm_started = true}, {reply, EarlyNodes, State2};handle_call(Msg, _From, State) -> error("~p got unexpected call: ~p~n", [?MODULE, Msg]), {noreply, State}.accept_protocol(Mon, Version, Protocol, From, State) -> Reply = {node(), {accept, self(), Version, Protocol}}, Node = node(Mon), Pending0 = State#state.pending_negotiators, Pending = lists:keydelete(Node, 1, Pending0), case lists:member(Node, State#state.going_down) of true -> %% Wait for the mnesia_down to be processed, %% before we reply P = Pending ++ [{Node, Mon, From, Reply}], {noreply, State#state{pending_negotiators = P}}; false -> %% No need for wait link(Mon), %% link to remote Monitor case Protocol == protocol_version() of true -> set({protocol, Node}, {Protocol, false}); false -> set({protocol, Node}, {Protocol, true}) end, {reply, Reply, State#state{pending_negotiators = Pending}} end.%%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_cast({mnesia_down, mnesia_controller, Node}, State) -> mnesia_tm:mnesia_down(Node), {noreply, State};handle_cast({mnesia_down, mnesia_tm, {Node, Pending}}, State) -> mnesia_locker:mnesia_down(Node, Pending), {noreply, State};handle_cast({mnesia_down, mnesia_locker, Node}, State) -> Down = {mnesia_down, Node}, mnesia_lib:report_system_event(Down), GoingDown = lists:delete(Node, State#state.going_down), State2 = State#state{going_down = GoingDown}, Pending = State#state.pending_negotiators, case lists:keysearch(Node, 1, Pending) of {value, {Node, Mon, ReplyTo, Reply}} -> %% Late reply to remote monitor link(Mon), %% link to remote Monitor gen_server:reply(ReplyTo, Reply), P2 = lists:keydelete(Node, 1,Pending), State3 = State2#state{pending_negotiators = P2}, process_q(State3); false -> %% No pending remote monitors {noreply, State2} end;handle_cast({disconnect, Node}, State) -> case rpc:call(Node, erlang, whereis, [?MODULE]) of {badrpc, _} -> ignore; undefined -> ignore; RemoteMon when pid(RemoteMon) -> unlink(RemoteMon) end, {noreply, State};handle_cast({inconsistent_database, Context, Node}, State) -> Msg = {inconsistent_database, Context, Node}, mnesia_lib:report_system_event(Msg), {noreply, State};handle_cast(Msg, State) -> error("~p got unexpected cast: ~p~n", [?MODULE, Msg]), {noreply, State}.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor -> dbg_out("~p was ~p by supervisor~n",[?MODULE, R]), {stop, R, State};handle_info({'EXIT', Pid, fatal}, State) when node(Pid) == node() -> dbg_out("~p got FATAL ERROR from: ~p~n",[?MODULE, Pid]), exit(State#state.supervisor, shutdown), {noreply, State};handle_info(Msg = {'EXIT',Pid,_}, State) -> Node = node(Pid), if Node /= node(), State#state.connecting == undefined -> %% Remotly linked process died, assume that it was a mnesia_monitor mnesia_recover:mnesia_down(Node), mnesia_controller:mnesia_down(Node), {noreply, State#state{going_down = [Node | State#state.going_down]}}; Node /= node() -> {noreply, State#state{mq = State#state.mq ++ [{info, Msg}]}}; true -> %% We have probably got an exit signal from %% disk_log or dets Hint = "Hint: check that the disk still is writable", fatal("~p got unexpected info: ~p; ~p~n", [?MODULE, Msg, Hint]) end;handle_info({protocol_negotiated, From,Res}, State) -> From = element(1,State#state.connecting), gen_server:reply(From, Res), process_q(State#state{connecting = undefined});handle_info({nodeup, Node}, State) -> %% Ok, we are connected to yet another Erlang node %% Let's check if Mnesia is running there in order %% to detect if the network has been partitioned %% due to communication failure. HasDown = mnesia_recover:has_mnesia_down(Node), ImRunning = mnesia_lib:is_running(), if %% If I'm not running the test will be made later. HasDown == true, ImRunning == yes -> spawn_link(?MODULE, detect_partitioned_network, [self(), Node]); true -> ignore end, {noreply, State};handle_info({nodedown, _Node}, State) -> %% Ignore, we are only caring about nodeup's {noreply, State};handle_info({disk_log, _Node, Log, Info}, State) -> case Info of {truncated, _No} -> ok; _ -> mnesia_lib:important("Warning Log file ~p error reason ~s~n", [Log, disk_log:format_error(Info)]) end, {noreply, State};handle_info(Msg, State) -> error("~p got unexpected info (~p): ~p~n", [?MODULE, State, Msg]).process_q(State = #state{mq=[]}) -> {noreply,State};process_q(State = #state{mq=[{info,Msg}|R]}) -> handle_info(Msg, State#state{mq=R});process_q(State = #state{mq=[{cast,Msg}|R]}) -> handle_cast(Msg, State#state{mq=R});process_q(State = #state{mq=[{call,From,Msg}|R]}) -> handle_call(Msg, From, State#state{mq=R}).%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, State) -> terminate_proc(?MODULE, Reason, State).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Upgrade process when its code is to be changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_, {state, SUP, PN, GD, TMS, EC}, _) -> {ok, #state{supervisor=SUP, pending_negotiators=PN, going_down = GD, tm_started =TMS, early_connects = EC}};code_change(_OldVsn, State, _Extra) -> {ok, State}.%%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------process_config_args([]) -> ok;process_config_args([C|T]) -> V = get_env(C), dbg_out("Env ~p: ~p~n", [C, V]), mnesia_lib:set(C, V), process_config_args(T).set_env(E,Val) -> mnesia_lib:set(E, check_type(E,Val)), ok.get_env(E) -> case ?catch_val(E) of {'EXIT', _} -> case application:get_env(mnesia, E) of {ok, Val} -> check_type(E, Val); undefined -> check_type(E, default_env(E)) end; Val -> Val end.env() -> [ access_module, auto_repair, backup_module, debug, dir, dump_log_load_regulation, dump_log_time_threshold, dump_log_update_in_place, dump_log_write_threshold, embedded_mnemosyne, event_module, extra_db_nodes, ignore_fallback_at_startup, fallback_error_function, max_wait_for_decision, schema_location, core_dir, pid_sort_order, no_table_loaders, dc_dump_limit ].default_env(access_module) -> mnesia;default_env(auto_repair) -> true;default_env(backup_module) -> mnesia_backup;default_env(debug) -> none;default_env(dir) -> Name = lists:concat(["Mnesia.", node()]), filename:absname(Name);default_env(dump_log_load_regulation) -> false;default_env(dump_log_time_threshold) -> timer:minutes(3);default_env(dump_log_update_in_place) -> true;default_env(dump_log_write_threshold) -> 1000;default_env(embedded_mnemosyne) -> false;default_env(event_module) -> mnesia_event;default_env(extra_db_nodes) -> [];default_env(ignore_fallback_at_startup) -> false;default_env(fallback_error_function) -> {mnesia, lkill};default_env(max_wait_for_decision) -> infinity;default_env(schema_location) -> opt_disc;default_env(core_dir) -> false;default_env(pid_sort_order) -> false;default_env(no_table_loaders) -> 2;default_env(dc_dump_limit) -> 4.check_type(Env, Val) -> case catch do_check_type(Env, Val) of {'EXIT', _Reason} -> exit({bad_config, Env, Val}); NewVal -> NewVal end. do_check_type(access_module, A) when atom(A) -> A;do_check_type(auto_repair, B) -> bool(B);do_check_type(backup_module, B) when atom(B) -> B;do_check_type(debug, debug) -> debug;do_check_type(debug, false) -> none;do_check_type(debug, none) -> none;do_check_type(debug, trace) -> trace;do_check_type(debug, true) -> debug;do_check_type(debug, verbose) -> verbose;do_check_type(dir, V) -> filename:absname(V);do_check_type(dump_log_load_regulation, B) -> bool(B);do_check_type(dump_log_time_threshold, I) when integer(I), I > 0 -> I;do_check_type(dump_log_update_in_place, B) -> bool(B);do_check_type(dump_log_write_threshold, I) when integer(I), I > 0 -> I;do_check_type(event_module, A) when atom(A) -> A;do_check_type(ignore_fallback_at_startup, B) -> bool(B);do_check_type(fallback_error_function, {Mod, Func}) when atom(Mod), atom(Func) -> {Mod, Func};do_check_type(embedded_mnemosyne, B) -> bool(B);do_check_type(extra_db_nodes, L) when list(L) -> Fun = fun(N) when N == node() -> false; (A) when atom(A) -> true end, lists:filter(Fun, L);do_check_type(max_wait_for_decision, infinity) -> infinity;do_check_type(max_wait_for_decision, I) when integer(I), I > 0 -> I;do_check_type(schema_location, M) -> media(M);do_check_type(core_dir, "false") -> false;do_check_type(core_dir, false) -> false;do_check_type(core_dir, Dir) when list(Dir) -> Dir;do_check_type(pid_sort_order, r9b_plain) -> r9b_plain;do_check_type(pid_sort_order, "r9b_plain") -> r9b_plain;do_check_type(pid_sort_order, standard) -> standard;do_check_type(pid_sort_order, "standard") -> standard;do_check_type(pid_sort_order, _) -> false;do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N.bool(true) -> true;bool(false) -> false.media(disc) -> disc;media(opt_disc) -> opt_disc;media(ram) -> ram.patch_env(Env, Val) -> case catch do_check_type(Env, Val) of {'EXIT', _Reason} -> {error, {bad_type, Env, Val}}; NewVal -> application_controller:set_env(mnesia, Env, NewVal), NewVal end.detect_partitioned_network(Mon, Node) -> detect_inconcistency([Node], running_partitioned_network), unlink(Mon), exit(normal).detect_inconcistency([], _Context) -> ok;detect_inconcistency(Nodes, Context) -> Downs = [N || N <- Nodes, mnesia_recover:has_mnesia_down(N)], {Replies, _BadNodes} = rpc:multicall(Downs, ?MODULE, has_remote_mnesia_down, [node()]), report_inconsistency(Replies, Context, ok).has_remote_mnesia_down(Node) -> HasDown = mnesia_recover:has_mnesia_down(Node), Master = mnesia_recover:get_master_nodes(schema), if HasDown == true, Master == [] -> {true, node()}; true -> {false, node()} end.report_inconsistency([{true, Node} | Replies], Context, _Status) -> %% Oops, Mnesia is already running on the %% other node AND we both regard each %% other as down. The database is %% potentially inconsistent and we has to %% do tell the applications about it, so %% they may perform some clever recovery %% action. Msg = {inconsistent_database, Context, Node}, mnesia_lib:report_system_event(Msg), report_inconsistency(Replies, Context, inconsistent_database);report_inconsistency([{false, _Node} | Replies], Context, Status) -> report_inconsistency(Replies, Context, Status);report_inconsistency([{badrpc, _Reason} | Replies], Context, Status) -> report_inconsistency(Replies, Context, Status);report_inconsistency([], _Context, Status) -> Status.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -