📄 et_collector.erl
字号:
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%%%----------------------------------------------------------------------%% Purpose: Collect trace events and provide a backing storage%% appropriate for iteration %%-----------------------------------------------------------------------module(et_collector).-behaviour(gen_server).%% External exports-export([ start_link/1, stop/1, report/2, report_event/5, report_event/6, iterate/3, iterate/5, start_trace_client/3, start_trace_port/1, %% load_event_file/2, save_event_file/3, clear_table/1, get_global_pid/0, %% get_table_handle/1, change_pattern/2, make_key/2, dict_insert/3, dict_delete/2, dict_lookup/2, dict_match/2, multicast/2 ]).%% gen_server callbacks-export([init/1,terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).-include("../include/et.hrl").-record(state, {parent_pid, event_tab, dict_tab, event_order, subscribers, file, trace_pattern, trace_port, trace_max_queue, trace_nodes, trace_global}).-record(file, {name, desc, event_opt, file_opt, table_opt}).-record(table_handle, {collector_pid, event_tab, event_order, filter}).-record(trace_ts, {trace_ts, event_ts}).-record(event_ts, {event_ts, trace_ts}).%%%----------------------------------------------------------------------%%% Client side%%%----------------------------------------------------------------------%%----------------------------------------------------------------------%% start_link(Options) -> {ok, CollectorPid} | {error, Reason}%%%% Start a collector process%%%% The collector collects trace events and keeps them ordered by their%% timestamp. The timestamp may either reflect the time when the%% actual trace data was generated (trace_ts) or when the trace data%% was transformed into an event record (event_ts). If the time stamp%% is missing in the trace data (missing timestamp option to%% erlang:trace/4) the trace_ts will be set to the event_ts.%% %% Events are reported to the collector directly with the report%% function or indirectly via one or more trace clients. All reported%% events are first filtered thru the collector filter before they are%% stored by the collector. By replacing the default collector filter%% with a customized dito it is possible to allow any trace data as%% input. The collector filter is a dictionary entry with the%% predefined key {filter, collector} and the value is a fun of%% arity 1. See et_selector:parse_event/2 for interface details,%% such as which erlang:trace/1 tuples that are accepted.%%%% The collector has a built-in dictionary service. Any term may be%% stored as value in the dictionary and bound to a unique key. When%% new values are inserted with an existing key, the new values will%% overwrite the existing ones. Processes may subscribe on dictionary%% updates by using {subscriber, pid()} as dictionary key. All%% dictionary updates will be propagated to the subscriber processes%% matching the pattern {{subscriber, '_'}, '_'} where the first '_'%% is interpreted as a pid().%%%% In global trace mode, the collector will automatically start%% tracing on all connected Erlang nodes. When a node connects, a port%% tracer will be started on that node and a corresponding trace%% client on the collector node. By default the global trace pattern%% is 'max'.%%%% Options = [option()]%% %% option() =%% {parent_pid, pid()} |%% {event_order, event_order()} |%% {dict_insert, {filter, collector}, collector_fun()} |%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |%% {dict_insert, {subscriber, pid()}, dict_val()} |%% {dict_insert, dict_key(), dict_val()} |%% {dict_delete, dict_key()} |%% {trace_client, trace_client()} |%% {trace_global, boolean()} | %% {trace_pattern, trace_pattern()} |%% {trace_port, integer()} | %% {trace_max_queue, integer()}%% %% event_order() = trace_ts | event_ts%% trace_pattern() = detail_level() | dbg_match_spec()%% detail_level() = min | max | integer(X) when X =< 0, X >= 100%% trace_client() = %% {event_file, file_name()} |%% {dbg_trace_type(), dbg_trace_parameters()}%% file_name() = string()%% collector_fun() = trace_filter_fun() | event_filter_fun()%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent}%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent}%% event_filter_name() = atom()%% TraceData = erlang_trace_data()%% Event = NewEvent = record(event)%% dict_key() = term()%% dict_val() = term()%%%% CollectorPid = pid()%% Reason = term()%%----------------------------------------------------------------------start_link(Options) -> case parse_opt(Options, default_state(), [], []) of {ok, S, Dict2, Clients} when S#state.trace_global == false -> case gen_server:start_link(?MODULE, [S, Dict2], []) of {ok, Pid} when S#state.parent_pid /= self() -> unlink(Pid), start_clients(Pid, Clients); {ok,Pid} -> start_clients(Pid, Clients); {error, Reason} -> {error, Reason} end; {ok, S, Dict2, Clients} when S#state.trace_global == true -> case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of {ok, Pid} when S#state.parent_pid /= self() -> unlink(Pid), start_clients(Pid, Clients); {ok,Pid} -> start_clients(Pid, Clients); {error, Reason} -> {error, Reason} end; {error, Reason} -> {error, Reason} end.default_state() -> #state{parent_pid = self(), event_order = trace_ts, subscribers = [], trace_global = false, trace_pattern = undefined, trace_nodes = [], trace_port = 4711, trace_max_queue = 50}.parse_opt([], S, Dict, Clients) -> {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), Fun = fun(E) -> et_selector:parse_event(Mod, E) end, Default = {dict_insert, {filter, collector}, Fun}, {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};parse_opt([H | T], S, Dict, Clients) -> case H of {parent_pid, Parent} when Parent == undefined -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); {parent_pid, Parent} when pid(Parent) -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); {event_order, Order} when Order == trace_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); {event_order, Order} when Order == event_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); {dict_insert, {filter, Name}, Fun} -> if atom(Name), function(Fun) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} end; {dict_insert, {subscriber, Pid}, _Val} -> if pid(Pid) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} end; {dict_insert, _Key, _Val} -> parse_opt(T, S, Dict ++ [H], Clients); {dict_delete, _Key} -> parse_opt(T, S, Dict ++ [H], Clients); {trace_client, Client = {_, _}} -> parse_opt(T, S, Dict, Clients ++ [Client]); {trace_global, Bool} when Bool == false -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); {trace_global, Bool} when Bool == true -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); {trace_pattern, {Mod, _} = Pattern} when atom(Mod) -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); {trace_pattern, undefined = Pattern} -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); {trace_port, Port} when integer(Port) -> parse_opt(T, S#state{trace_port = Port}, Dict, Clients); {trace_max_queue, MaxQueue} when integer(MaxQueue) -> parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); Bad -> {error, {bad_option, Bad}} end;parse_opt(BadList, _S, _Dict, _Clients) -> {error, {bad_option_list, BadList}}.start_clients(CollectorPid, [{Type, Parameters} | T]) -> start_trace_client(CollectorPid, Type, Parameters), start_clients(CollectorPid, T);start_clients(CollectorPid, []) -> {ok, CollectorPid}.%%----------------------------------------------------------------------%% stop(CollectorPid) -> ok%%%% Stop a collector process%%%% CollectorPid = pid()%%----------------------------------------------------------------------stop(CollectorPid) -> call(CollectorPid, stop).%%----------------------------------------------------------------------%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason}%%%% Saves the events to a file%% %% CollectorPid = pid()%% FileName = string()%% Options = [option()]%% Reason = term()%%%% option() = event_option() | file_option() | table_option()%% event_option() = existing%% file_option() = write | append%% table_option() = keep | clear%%%% By default the currently stored events (existing) are%% written to a brand new file (write) and the events are%% kept (keep) after they have been written to the file.%%%% Instead of keeping the events after writing them to file,%% it is possible to remove all stored events after they%% have successfully written to file (clear).%% %% The options defaults to existing, write and keep.%%----------------------------------------------------------------------save_event_file(CollectorPid, FileName, Options) -> call(CollectorPid, {save_event_file, FileName, Options}).%%----------------------------------------------------------------------%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason)%%%% Load the event table from a file%% %% CollectorPid = pid()%% FileName = string()%% BadBytes = integer(X) where X >= 0%% Reason = term()%%----------------------------------------------------------------------load_event_file(CollectorPid, FileName) -> Fd = make_ref(), Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}], Fun = fun(Event, {ok, TH}) -> report(TH, Event) end, case disk_log:open(Args) of {ok, _} -> do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0); {repaired, _, _, BadBytes} -> do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes); {error, Reason} -> exit({disk_log_open, FileName, Reason}) end.do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> case disk_log:chunk(Fd, Cont) of eof -> {ok, BadBytes}; {error, Reason} -> exit({bad_disk_log_chunk, FileName, Reason}); {Cont2, Events} -> Acc2 = lists:foldl(Fun, Acc, Events), do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes); {Cont2, Events, More} -> Acc2 = lists:foldl(Fun, Acc, Events), do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More) end.%%----------------------------------------------------------------------%% report(Handle, TraceOrEvent)%%%% Report an event to the collector%%%% All events are filtered thru the collector filter, which%% optionally may transform or discard the event. The first%% call should use the pid of the collector process as%% report handle, while subsequent calls should use the%% table handle.%%%% Handle = Initial | Continuation%% Initial = collector_pid()%% collector_pid() = pid()%% Continuation = record(table_handle)%%%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace%% Reason = term()%% %% Returns: {ok, Continuation} | exit(Reason)%%----------------------------------------------------------------------report(CollectorPid, TraceOrEvent) when pid(CollectorPid) -> case get_table_handle(CollectorPid) of {ok, TH} when record(TH, table_handle) -> report(TH, TraceOrEvent); {error, Reason} -> exit(Reason) end;report(TH, TraceOrEvent) when record(TH, table_handle) -> Fun = TH#table_handle.filter, case Fun(TraceOrEvent) of false -> {ok, TH}; true when record(TraceOrEvent, event) -> Key = make_key(TH, TraceOrEvent), case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of true -> {ok, TH}; {'EXIT', _Reason} -> %% Refresh the report handle and try again report(TH#table_handle.collector_pid, TraceOrEvent) end; {true, Event} when record(Event, event) -> Key = make_key(TH, Event), case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of true -> {ok, TH}; {'EXIT', _Reason} -> %% Refresh the report handle and try again report(TH#table_handle.collector_pid, TraceOrEvent) end; BadEvent -> TS = erlang:now(), Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}], Event = #event{detail_level = 0, trace_ts = TS, event_ts = TS, from = bad_filter, to = bad_filter, label = bad_filter, contents = Contents}, Key = make_key(TH, Event), case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of true -> {ok, TH}; {'EXIT', _Reason} -> %% Refresh the report handle and try again report(TH#table_handle.collector_pid, TraceOrEvent) end end;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -