📄 et_collector.erl
字号:
Limit2 = incr(Limit, Incr), iterate(TH, Next, Limit2, Fun, Acc2) end.incr(Val, Incr) -> if Val == infinity -> Val; Val == '-infinity' -> Val; integer(Val) -> Val + Incr end.%%----------------------------------------------------------------------%% clear_table(Handle) -> ok%%%% Clear the event table%%%% Handle = collector_pid() | table_handle()%% collector_pid() = pid()%% table_handle() = record(table_handle)%%----------------------------------------------------------------------clear_table(CollectorPid) when pid(CollectorPid) -> call(CollectorPid, clear_table);clear_table(TH) when record(TH, table_handle) -> clear_table(TH#table_handle.collector_pid).call(CollectorPid, Request) -> gen_server:call(CollectorPid, Request, infinity).%%%----------------------------------------------------------------------%%% Callback functions from gen_server%%%----------------------------------------------------------------------%%----------------------------------------------------------------------%% Func: init/1%% Returns: {ok, State} |%% {ok, State, Timeout} |%% ignore |%% {stop, Reason}%%----------------------------------------------------------------------init([InitialS, Dict]) -> process_flag(trap_exit, true), case InitialS#state.parent_pid of undefined -> ignore; Pid when pid(Pid) -> link(Pid) end, Funs = [fun init_tables/1, fun init_global/1, fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end], {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}.init_tables(S) -> EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), S#state{event_tab = EventTab, dict_tab = DictTab}.init_global(S) -> case S#state.trace_global of true -> EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, EndFun = fun(Acc) -> Acc end, Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}), dbg:tracer(process, Spec), et_selector:change_pattern(S#state.trace_pattern), net_kernel:monitor_nodes(true), lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()), S#state{trace_nodes = [node()]}; false -> S end.%%----------------------------------------------------------------------%% Func: handle_call/3%% Returns: {reply, Reply, State} |%% {reply, Reply, State, Timeout} |%% {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, Reply, State} | (terminate/2 is called)%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_call({multicast, Msg}, _From, S) -> do_multicast(S#state.subscribers, Msg), {reply, ok, S};handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> S2 = do_dict_insert(Msg, S), {reply, ok, S2};handle_call(Msg = {dict_delete, _Key}, _From, S) -> S2 = do_dict_delete(Msg, S), {reply, ok, S2};handle_call({dict_lookup, Key}, _From, S) -> Reply = ets:lookup(S#state.dict_tab, Key), {reply, Reply, S};handle_call({dict_match, Pattern}, _From, S) -> case catch ets:match_object(S#state.dict_tab, Pattern) of {'EXIT', _Reason} -> {reply, [], S}; Matching -> {reply, Matching, S} end;handle_call(get_table_handle, _From, S) -> [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}), TH = #table_handle{collector_pid = self(), event_tab = S#state.event_tab, event_order = S#state.event_order, filter = TableFilter}, {reply, {ok, TH}, S};handle_call(close, _From, S) -> case S#state.file of undefined -> {reply, {error, file_not_open}, S}; F -> Reply = disk_log:close(F#file.desc), S2 = S#state{file = undefined}, {reply, Reply, S2} end;handle_call({save_event_file, FileName, Options}, _From, S) -> Default = #file{name = FileName, event_opt = existing, file_opt = write, table_opt = keep}, case parse_file_options(Default, Options) of {ok, F} when record(F, file) -> case file_open(F) of {ok, Fd} -> F2 = F#file{desc = Fd}, {Reply2, S3} = case F2#file.event_opt of %% new -> %% Reply = ok, %% S2 = S#state{file = F}, %% {Reply, S2}; %% %% insert() -> %% case S2#state.file of %% undefined -> %% ignore; %% F -> %% Fd = F#file.desc, %% ok = disk_log:log(Fd, Event) %% end. existing -> Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end, Tab = S#state.event_tab, Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok), disk_log:close(Fd), {Reply, S} %% all -> %% Reply = tab_iterate(WriteFun, Tab, ok), %% S2 = S#state{file = F}, %% {Reply, S2} end, case F2#file.table_opt of keep -> {reply, Reply2, S3}; clear -> S4 = do_clear_table(S3), {reply, Reply2, S4} end; {error, Reason} -> {reply, {error, {file_open, Reason}}, S} end; {error, Reason} -> {reply, {error, Reason}, S} end;handle_call({change_pattern, Pattern}, _From, S) -> Ns = S#state.trace_nodes, rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), Reply = {old_pattern, S#state.trace_pattern}, S2 = S#state{trace_pattern = Pattern}, {reply, Reply, S2};handle_call(clear_table, _From, S) -> S2 = do_clear_table(S), {reply, ok, S2};handle_call(stop, _From, S) -> do_multicast(S#state.subscribers, close), case S#state.trace_global of true -> rpc:multicall(S#state.trace_nodes, dbg, stop, []); false -> ignore end, {stop, shutdown, ok, S};handle_call(Request, From, S) -> ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n", [?MODULE, self(), Request, From, S]), {reply, {error, {bad_request, Request}}, S}.%%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_cast(Msg, S) -> ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n", [?MODULE, self(), Msg, S]), {noreply, S}.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State} |%% {noreply, State, Timeout} |%% {stop, Reason, State} (terminate/2 is called)%%----------------------------------------------------------------------handle_info({nodeup, Node}, S) -> Port = S#state.trace_port, MaxQueue = S#state.trace_max_queue, case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of {ok, _} -> listen_on_trace_port(Node, Port, S); {error, Reason} when Reason == already_started-> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, {noreply, S2}; {badrpc, Reason} -> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, {noreply, S2}; {error, Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, {noreply, S2} end;handle_info({nodedown, Node}, S) -> {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}};handle_info({register_trace_client, Pid}, S) -> link(Pid), {noreply, S};handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid -> {stop, Reason, S};handle_info(Info = {'EXIT', Pid, _Reason}, S) -> OldSubscribers = S#state.subscribers, case lists:member(Pid, OldSubscribers) of true -> S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), {noreply, S2}; false -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), {noreply, S} end;handle_info(Info, S) -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), {noreply, S}.listen_on_trace_port(Node, Port, S) -> [_Name, Host] = string:tokens(atom_to_list(Node), [$@]), case catch start_trace_client(self(), ip, {Host, Port}) of {trace_client_pid, RemotePid} -> rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), link(RemotePid), S2 = S#state{trace_nodes = [Node | S#state.trace_nodes], trace_port = Port + 1}, {noreply, S2}; {'EXIT', Reason} when Reason == already_started-> ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, {noreply, S2}; {'EXIT', Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, {noreply, S2} end.%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, S) -> Fun = fun(Pid) -> exit(Pid, Reason) end, lists:foreach(Fun, S#state.subscribers).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Convert process state when code is changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_OldVsn, S, _Extra) -> {ok, S}.%%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------do_clear_table(S) -> OldTab = S#state.event_tab, ets:delete(OldTab), NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), S#state{event_tab = NewTab}.do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) -> OldSubscribers = S#state.subscribers, NewSubscribers = case lists:member(Pid, OldSubscribers) of true -> OldSubscribers; false -> link(Pid), All = ets:match_object(S#state.dict_tab, '_'), lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All), [Pid | OldSubscribers] end, do_multicast(NewSubscribers, Msg), ets:insert(S#state.dict_tab, {Key, Val}), S#state{subscribers = NewSubscribers};do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> do_multicast(S#state.subscribers, Msg), ets:insert(S#state.dict_tab, {Key, Val}), S.do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> OldSubscribers = S#state.subscribers, do_multicast(OldSubscribers, Msg), ets:delete(S#state.dict_tab, Key), case lists:member(Pid, OldSubscribers) of true -> unlink(Pid), S#state{subscribers = OldSubscribers -- [Pid]}; false -> S end;do_dict_delete({dict_delete, {filter, collector}}, S) -> S;do_dict_delete(Msg = {dict_delete, Key}, S) -> do_multicast(S#state.subscribers, Msg), ets:delete(S#state.dict_tab, Key), S.tab_iterate(_Fun, _Tab, '$end_of_table', Acc) -> Acc;tab_iterate(Fun, Tab, Key, Acc) -> Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)), tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2).file_open(F) -> Fd = make_ref(), case F#file.file_opt of write -> file:rename(F#file.name, F#file.name ++ ".OLD"); append -> ignore end, Args = [{file, F#file.name}, {name, Fd}, {repair, true}, {mode, read_write}], case disk_log:open(Args) of {ok, _} -> {ok, Fd}; {repaired, _, _, BadBytes} -> ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~p~n", [?MODULE, BadBytes, F#file.name]), {ok, Fd}; {error,Reason} -> {error,Reason} end.parse_file_options(F, [H | T]) -> case H of existing -> parse_file_options(F#file{event_opt = existing} , T); %%new -> parse_file_options(F#file{event_opt = new} , T); all -> parse_file_options(F#file{event_opt = all} , T); write -> parse_file_options(F#file{file_opt = write} , T); append -> parse_file_options(F#file{file_opt = append} , T); keep -> parse_file_options(F#file{table_opt = keep} , T); clear -> parse_file_options(F#file{table_opt = clear} , T); Bad -> {error, {bad_file_option, Bad}} end;parse_file_options(F, []) -> {ok, F}.do_multicast([Pid | Pids], Msg) -> Pid ! {et, Msg}, do_multicast(Pids, Msg);do_multicast([], _Msg) -> ok.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -