欢迎来到虫虫下载站 | 资源下载 资源专辑 关于我们
虫虫下载站

mnesia_monitor.erl

OTP是开放电信平台的简称
ERL
第 1 页 / 共 2 页
字号:
	    %% in this release we should be able to handle the previous 	    %% protocol	    case hd(Protocols) of		?previous_protocol_version ->		    accept_protocol(Mon, MyVersion, ?previous_protocol_version, From, State);		_ ->		    verbose("Connection with ~p rejected. "			    "version = ~p, protocols = ~p, "			    "expected version = ~p, expected protocol = ~p~n",			    [node(Mon), Version, Protocols, MyVersion, Protocol]),		    {reply, {node(), {reject, self(), MyVersion, Protocol}}, State}	    end    end;%% Local request to negotiate with other monitors (nodes).handle_call({negotiate_protocol, Nodes}, From, State) ->        case mnesia_lib:intersect(State#state.going_down, Nodes) of	[] ->	    spawn_link(?MODULE, negotiate_protocol_impl, [Nodes, From]),	    {noreply, State#state{connecting={From,Nodes}}};	_ ->  %% Cannot connect now, still processing mnesia down	    {reply, busy, State}    end;handle_call(init, _From, State) ->    net_kernel:monitor_nodes(true),    EarlyNodes = State#state.early_connects,    State2 = State#state{tm_started = true},    {reply, EarlyNodes, State2};handle_call(Msg, _From, State) ->    error("~p got unexpected call: ~p~n", [?MODULE, Msg]),    {noreply, State}.accept_protocol(Mon, Version, Protocol, From, State) ->    Reply = {node(), {accept, self(), Version, Protocol}},    Node = node(Mon),    Pending0 = State#state.pending_negotiators,    Pending = lists:keydelete(Node, 1, Pending0),    case lists:member(Node, State#state.going_down) of	true ->	    %% Wait for the mnesia_down to be processed,	    %% before we reply	    P = Pending ++ [{Node, Mon, From, Reply}],	    {noreply, State#state{pending_negotiators = P}};	false ->	    %% No need for wait	    link(Mon),  %% link to remote Monitor	    case Protocol == protocol_version() of		true -> 		    set({protocol, Node}, {Protocol, false});		false ->		    set({protocol, Node}, {Protocol, true})	    end,	    {reply, Reply, State#state{pending_negotiators = Pending}}    end.%%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_cast({mnesia_down, mnesia_controller, Node}, State) ->    mnesia_tm:mnesia_down(Node),    {noreply, State};handle_cast({mnesia_down, mnesia_tm, {Node, Pending}}, State) ->    mnesia_locker:mnesia_down(Node, Pending),    {noreply, State};handle_cast({mnesia_down, mnesia_locker, Node}, State) ->    Down = {mnesia_down, Node},    mnesia_lib:report_system_event(Down),    GoingDown = lists:delete(Node, State#state.going_down),    State2 = State#state{going_down = GoingDown},    Pending = State#state.pending_negotiators,    case lists:keysearch(Node, 1, Pending) of	{value, {Node, Mon, ReplyTo, Reply}} ->	    %% Late reply to remote monitor	    link(Mon),  %% link to remote Monitor	    gen_server:reply(ReplyTo, Reply),	    P2 = lists:keydelete(Node, 1,Pending),	    State3 = State2#state{pending_negotiators = P2},	    process_q(State3);	false ->	    %% No pending remote monitors	    {noreply, State2}    end;handle_cast({disconnect, Node}, State) ->    case rpc:call(Node, erlang, whereis, [?MODULE]) of	{badrpc, _} ->	    ignore;	undefined ->	    ignore;	RemoteMon when pid(RemoteMon) -> 	    unlink(RemoteMon)    end,    {noreply, State};handle_cast({inconsistent_database, Context, Node}, State) ->    Msg = {inconsistent_database, Context, Node},    mnesia_lib:report_system_event(Msg),    {noreply, State};handle_cast(Msg, State) ->    error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),    {noreply, State}.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_info({'EXIT', Pid, R}, State) when Pid == State#state.supervisor ->    dbg_out("~p was ~p by supervisor~n",[?MODULE, R]),    {stop, R, State};handle_info({'EXIT', Pid, fatal}, State) when node(Pid) == node() ->     dbg_out("~p got FATAL ERROR from: ~p~n",[?MODULE, Pid]),    exit(State#state.supervisor, shutdown),    {noreply, State};handle_info(Msg = {'EXIT',Pid,_}, State) ->    Node = node(Pid),    if	Node /= node(), State#state.connecting == undefined ->	    %% Remotly linked process died, assume that it was a mnesia_monitor	    mnesia_recover:mnesia_down(Node),	    mnesia_controller:mnesia_down(Node),	    {noreply, State#state{going_down = [Node | State#state.going_down]}};	Node /= node() ->	    {noreply, State#state{mq = State#state.mq ++ [{info, Msg}]}};	true ->	    %% We have probably got an exit signal from 	    %% disk_log or dets	    Hint = "Hint: check that the disk still is writable",	    fatal("~p got unexpected info: ~p; ~p~n",		  [?MODULE, Msg, Hint])    end;handle_info({protocol_negotiated, From,Res}, State) ->    From = element(1,State#state.connecting),    gen_server:reply(From, Res),    process_q(State#state{connecting = undefined});handle_info({nodeup, Node}, State) ->    %% Ok, we are connected to yet another Erlang node    %% Let's check if Mnesia is running there in order    %% to detect if the network has been partitioned    %% due to communication failure.        HasDown   = mnesia_recover:has_mnesia_down(Node),    ImRunning = mnesia_lib:is_running(),        if	%% If I'm not running the test will be made later.	HasDown == true, ImRunning == yes ->	    spawn_link(?MODULE, detect_partitioned_network, [self(), Node]);	true ->	    ignore    end,    {noreply, State};handle_info({nodedown, _Node}, State) ->    %% Ignore, we are only caring about nodeup's    {noreply, State};handle_info({disk_log, _Node, Log, Info}, State) ->    case Info of	{truncated, _No} ->	    ok;	_ ->	    mnesia_lib:important("Warning Log file ~p error reason ~s~n", 				 [Log, disk_log:format_error(Info)])    end,    {noreply, State};handle_info(Msg, State) ->    error("~p got unexpected info (~p): ~p~n", [?MODULE, State, Msg]).process_q(State = #state{mq=[]}) -> {noreply,State};process_q(State = #state{mq=[{info,Msg}|R]}) ->    handle_info(Msg, State#state{mq=R});process_q(State = #state{mq=[{cast,Msg}|R]}) ->    handle_cast(Msg, State#state{mq=R});process_q(State = #state{mq=[{call,From,Msg}|R]}) ->    handle_call(Msg, From, State#state{mq=R}).%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, State) ->    terminate_proc(?MODULE, Reason, State).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Upgrade process when its code is to be changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_, {state, SUP, PN, GD, TMS, EC}, _) ->    {ok, #state{supervisor=SUP, pending_negotiators=PN,		going_down = GD, tm_started =TMS, early_connects = EC}};code_change(_OldVsn, State, _Extra) ->    {ok, State}.%%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------process_config_args([]) ->    ok;process_config_args([C|T]) ->    V = get_env(C),    dbg_out("Env ~p: ~p~n", [C, V]),    mnesia_lib:set(C, V),    process_config_args(T).set_env(E,Val) ->    mnesia_lib:set(E, check_type(E,Val)),    ok.get_env(E) ->    case ?catch_val(E) of	{'EXIT', _} ->	    case application:get_env(mnesia, E) of		{ok, Val} ->		    check_type(E, Val);		undefined ->		    check_type(E, default_env(E))	    end;	Val ->	    Val    end.env() ->    [     access_module,     auto_repair,     backup_module,     debug,     dir,     dump_log_load_regulation,     dump_log_time_threshold,     dump_log_update_in_place,     dump_log_write_threshold,     embedded_mnemosyne,     event_module,     extra_db_nodes,     ignore_fallback_at_startup,     fallback_error_function,     max_wait_for_decision,     schema_location,     core_dir,     pid_sort_order,     no_table_loaders,     dc_dump_limit    ].default_env(access_module) ->     mnesia;default_env(auto_repair) ->     true;default_env(backup_module) ->     mnesia_backup;default_env(debug) ->     none;default_env(dir) ->    Name = lists:concat(["Mnesia.", node()]),    filename:absname(Name);default_env(dump_log_load_regulation) ->     false;default_env(dump_log_time_threshold) ->     timer:minutes(3);default_env(dump_log_update_in_place) ->     true;default_env(dump_log_write_threshold) ->    1000;default_env(embedded_mnemosyne) ->     false;default_env(event_module) ->     mnesia_event;default_env(extra_db_nodes) ->     [];default_env(ignore_fallback_at_startup) ->     false;default_env(fallback_error_function) ->    {mnesia, lkill};default_env(max_wait_for_decision) ->     infinity;default_env(schema_location) ->     opt_disc;default_env(core_dir) ->    false;default_env(pid_sort_order) ->    false;default_env(no_table_loaders) ->    2;default_env(dc_dump_limit) ->    4.check_type(Env, Val) ->    case catch do_check_type(Env, Val) of	{'EXIT', _Reason} ->	    exit({bad_config, Env, Val});	NewVal ->	    NewVal    end.				      do_check_type(access_module, A) when atom(A) -> A;do_check_type(auto_repair, B) -> bool(B);do_check_type(backup_module, B) when atom(B) -> B;do_check_type(debug, debug) -> debug;do_check_type(debug, false) -> none;do_check_type(debug, none) -> none;do_check_type(debug, trace) -> trace;do_check_type(debug, true) -> debug;do_check_type(debug, verbose) -> verbose;do_check_type(dir, V) -> filename:absname(V);do_check_type(dump_log_load_regulation, B) -> bool(B);do_check_type(dump_log_time_threshold, I) when integer(I), I > 0 -> I;do_check_type(dump_log_update_in_place, B) -> bool(B);do_check_type(dump_log_write_threshold, I) when integer(I), I > 0 -> I;do_check_type(event_module, A) when atom(A) -> A;do_check_type(ignore_fallback_at_startup, B) -> bool(B);do_check_type(fallback_error_function, {Mod, Func})   when atom(Mod), atom(Func) -> {Mod, Func};do_check_type(embedded_mnemosyne, B) -> bool(B);do_check_type(extra_db_nodes, L) when list(L) ->    Fun = fun(N) when N == node() -> false;	     (A) when atom(A) -> true	  end,    lists:filter(Fun, L);do_check_type(max_wait_for_decision, infinity) -> infinity;do_check_type(max_wait_for_decision, I) when integer(I), I > 0 -> I;do_check_type(schema_location, M) -> media(M);do_check_type(core_dir, "false") -> false;do_check_type(core_dir, false) -> false;do_check_type(core_dir, Dir) when list(Dir) -> Dir;do_check_type(pid_sort_order, r9b_plain) -> r9b_plain;do_check_type(pid_sort_order, "r9b_plain") -> r9b_plain;do_check_type(pid_sort_order, standard) -> standard;do_check_type(pid_sort_order, "standard") -> standard;do_check_type(pid_sort_order, _) -> false;do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N.bool(true) -> true;bool(false) -> false.media(disc) -> disc;media(opt_disc) -> opt_disc;media(ram) -> ram.patch_env(Env, Val) ->    case catch do_check_type(Env, Val) of	{'EXIT', _Reason} ->	    {error, {bad_type, Env, Val}};	NewVal ->	    application_controller:set_env(mnesia, Env, NewVal),	    NewVal    end.detect_partitioned_network(Mon, Node) ->    detect_inconcistency([Node], running_partitioned_network),    unlink(Mon),    exit(normal).detect_inconcistency([], _Context) ->    ok;detect_inconcistency(Nodes, Context) ->    Downs = [N || N <- Nodes, mnesia_recover:has_mnesia_down(N)],    {Replies, _BadNodes} =	rpc:multicall(Downs, ?MODULE, has_remote_mnesia_down, [node()]),    report_inconsistency(Replies, Context, ok).has_remote_mnesia_down(Node) ->    HasDown = mnesia_recover:has_mnesia_down(Node),    Master  = mnesia_recover:get_master_nodes(schema),    if 	HasDown == true, Master == [] -> 	    {true, node()};	true ->	    {false, node()}    end.report_inconsistency([{true, Node} | Replies], Context, _Status) ->    %% Oops, Mnesia is already running on the    %% other node AND we both regard each    %% other as down. The database is    %% potentially inconsistent and we has to    %% do tell the applications about it, so    %% they may perform some clever recovery    %% action.    Msg = {inconsistent_database, Context, Node},    mnesia_lib:report_system_event(Msg),    report_inconsistency(Replies, Context, inconsistent_database);report_inconsistency([{false, _Node} | Replies], Context, Status) ->    report_inconsistency(Replies, Context, Status);report_inconsistency([{badrpc, _Reason} | Replies], Context, Status) ->    report_inconsistency(Replies, Context, Status);report_inconsistency([], _Context, Status) ->    Status.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -