⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 et_collector.erl

📁 OTP是开放电信平台的简称
💻 ERL
📖 第 1 页 / 共 3 页
字号:
            Limit2 = incr(Limit, Incr),            iterate(TH, Next, Limit2, Fun, Acc2)    end.incr(Val, Incr) ->    if	Val == infinity    -> Val;	Val == '-infinity' -> Val;	integer(Val)       -> Val + Incr    end.%%----------------------------------------------------------------------%% clear_table(Handle) -> ok%%%% Clear the event table%%%% Handle = collector_pid() | table_handle()%% collector_pid() = pid()%% table_handle() = record(table_handle)%%----------------------------------------------------------------------clear_table(CollectorPid) when pid(CollectorPid) ->    call(CollectorPid, clear_table);clear_table(TH) when record(TH, table_handle) ->    clear_table(TH#table_handle.collector_pid).call(CollectorPid, Request) ->    gen_server:call(CollectorPid, Request, infinity).%%%----------------------------------------------------------------------%%% Callback functions from gen_server%%%----------------------------------------------------------------------%%----------------------------------------------------------------------%% Func: init/1%% Returns: {ok, State}          |%%          {ok, State, Timeout} |%%          ignore               |%%          {stop, Reason}%%----------------------------------------------------------------------init([InitialS, Dict]) ->    process_flag(trap_exit, true),    case InitialS#state.parent_pid of	undefined ->	    ignore;	Pid when pid(Pid) ->	    link(Pid)    end,    Funs = [fun init_tables/1,            fun init_global/1,            fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end],    {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}.init_tables(S) ->     EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),    DictTab  = ets:new(et_dict,   [ordered_set, {keypos, 1}, public]),    S#state{event_tab = EventTab, dict_tab = DictTab}.init_global(S) ->     case S#state.trace_global of        true ->            EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,            EndFun = fun(Acc) -> Acc end,            Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}),            dbg:tracer(process, Spec),            et_selector:change_pattern(S#state.trace_pattern),            net_kernel:monitor_nodes(true),            lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()),            S#state{trace_nodes = [node()]};        false ->            S    end.%%----------------------------------------------------------------------%% Func: handle_call/3%% Returns: {reply, Reply, State}          |%%          {reply, Reply, State, Timeout} |%%          {noreply, State}               |%%          {noreply, State, Timeout}      |%%          {stop, Reason, Reply, State}   | (terminate/2 is called)%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_call({multicast, Msg}, _From, S) ->    do_multicast(S#state.subscribers, Msg),    {reply, ok, S};handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->    S2 = do_dict_insert(Msg, S),    {reply, ok, S2};handle_call(Msg = {dict_delete, _Key}, _From, S) ->    S2 = do_dict_delete(Msg, S),    {reply, ok, S2};handle_call({dict_lookup, Key}, _From, S) ->    Reply = ets:lookup(S#state.dict_tab, Key),    {reply, Reply, S};handle_call({dict_match, Pattern}, _From, S) ->    case catch ets:match_object(S#state.dict_tab, Pattern) of        {'EXIT', _Reason} ->            {reply, [], S};        Matching ->            {reply, Matching, S}    end;handle_call(get_table_handle, _From, S) ->    [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}),    TH = #table_handle{collector_pid = self(),                       event_tab     = S#state.event_tab,                       event_order   = S#state.event_order,                       filter        = TableFilter},    {reply, {ok, TH}, S};handle_call(close, _From, S) ->    case S#state.file of        undefined ->            {reply, {error, file_not_open}, S};        F ->            Reply = disk_log:close(F#file.desc),            S2 = S#state{file = undefined},            {reply, Reply, S2}    end;handle_call({save_event_file, FileName, Options}, _From, S) ->    Default = #file{name      = FileName,                    event_opt = existing,                    file_opt  = write,                    table_opt = keep},    case parse_file_options(Default, Options) of        {ok, F} when record(F, file) ->            case file_open(F) of                {ok, Fd} ->                    F2 = F#file{desc = Fd},                    {Reply2, S3} =                         case F2#file.event_opt of                            %% new ->                            %%     Reply = ok,                            %%     S2 = S#state{file = F},                            %%     {Reply, S2};                            %%                            %% insert() ->                            %%   case S2#state.file of                                %%       undefined ->                            %%           ignore;                            %%       F  ->                            %%           Fd = F#file.desc,                            %%           ok = disk_log:log(Fd, Event)                            %%   end.                            existing ->                                Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end,                                Tab = S#state.event_tab,                                Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok),                                disk_log:close(Fd),                                {Reply, S}                            %% all ->                            %%     Reply = tab_iterate(WriteFun, Tab, ok),                            %%     S2 = S#state{file = F},                            %%     {Reply, S2}                        end,                    case F2#file.table_opt of                        keep ->                            {reply, Reply2, S3};                        clear ->                            S4 = do_clear_table(S3),                            {reply, Reply2, S4}                    end;                {error, Reason} ->                    {reply, {error, {file_open, Reason}}, S}            end;        {error, Reason} ->            {reply, {error, Reason}, S}    end;handle_call({change_pattern, Pattern}, _From, S) ->    Ns = S#state.trace_nodes,    rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),    Reply = {old_pattern, S#state.trace_pattern},    S2 = S#state{trace_pattern = Pattern},    {reply, Reply, S2};handle_call(clear_table, _From, S) ->    S2 = do_clear_table(S),    {reply, ok, S2};handle_call(stop, _From, S) ->    do_multicast(S#state.subscribers, close),    case S#state.trace_global of        true  -> rpc:multicall(S#state.trace_nodes, dbg, stop, []);        false -> ignore    end,    {stop, shutdown, ok, S};handle_call(Request, From, S) ->    ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n",                             [?MODULE, self(), Request, From, S]),    {reply, {error, {bad_request, Request}}, S}.%%----------------------------------------------------------------------%% Func: handle_cast/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_cast(Msg, S) ->    ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n",                             [?MODULE, self(), Msg, S]),    {noreply, S}.%%----------------------------------------------------------------------%% Func: handle_info/2%% Returns: {noreply, State}          |%%          {noreply, State, Timeout} |%%          {stop, Reason, State}            (terminate/2 is called)%%----------------------------------------------------------------------handle_info({nodeup, Node}, S) ->    Port     = S#state.trace_port,    MaxQueue = S#state.trace_max_queue,    case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of        {ok, _} ->            listen_on_trace_port(Node, Port, S);        {error, Reason} when Reason == already_started->            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~p~n",                                     [?MODULE, self(), Node, Port, Reason]),            S2 = S#state{trace_port = Port + 1},            {noreply, S2};        {badrpc, Reason} ->            ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n    ~p~n",                                     [?MODULE, self(), Node, Port, Reason]),            S2 = S#state{trace_port = Port + 1},            {noreply, S2};        {error, Reason} ->            self() ! {nodeup, Node},            ok = error_logger:format("~p(~p): producer retry(~p:~p):~n     ~p~n",                                     [?MODULE, self(), Node, Port, Reason]),            S2 = S#state{trace_port = Port + 1},            {noreply, S2}    end;handle_info({nodedown, Node}, S) ->    {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}};handle_info({register_trace_client, Pid}, S) ->    link(Pid),    {noreply, S};handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid ->    {stop, Reason, S};handle_info(Info = {'EXIT', Pid, _Reason}, S) ->    OldSubscribers = S#state.subscribers,    case lists:member(Pid, OldSubscribers) of        true  ->            S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),            {noreply, S2};        false ->            ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",                                     [?MODULE, self(), Info, S]),            {noreply, S}    end;handle_info(Info, S) ->    ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",                             [?MODULE, self(), Info, S]),    {noreply, S}.listen_on_trace_port(Node, Port, S) ->    [_Name, Host] = string:tokens(atom_to_list(Node), [$@]),    case catch start_trace_client(self(), ip, {Host, Port}) of        {trace_client_pid, RemotePid} ->            rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),            link(RemotePid),            S2 = S#state{trace_nodes = [Node | S#state.trace_nodes],                         trace_port  = Port + 1},            {noreply, S2};        {'EXIT', Reason} when Reason == already_started->            ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n",                                     [?MODULE, self(), Node, Port, Reason]),            S2 = S#state{trace_port = Port + 1},            {noreply, S2};        {'EXIT', Reason} ->            self() ! {nodeup, Node},            ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n     ~p~n",                                     [?MODULE, self(), Node, Port, Reason]),            S2 = S#state{trace_port = Port + 1},            {noreply, S2}    end.%%----------------------------------------------------------------------%% Func: terminate/2%% Purpose: Shutdown the server%% Returns: any (ignored by gen_server)%%----------------------------------------------------------------------terminate(Reason, S) ->    Fun = fun(Pid) -> exit(Pid, Reason) end,    lists:foreach(Fun, S#state.subscribers).%%----------------------------------------------------------------------%% Func: code_change/3%% Purpose: Convert process state when code is changed%% Returns: {ok, NewState}%%----------------------------------------------------------------------code_change(_OldVsn, S, _Extra) ->    {ok, S}.%%%----------------------------------------------------------------------%%% Internal functions%%%----------------------------------------------------------------------do_clear_table(S) ->    OldTab = S#state.event_tab,    ets:delete(OldTab),    NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),    S#state{event_tab = NewTab}.do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) ->    OldSubscribers = S#state.subscribers,    NewSubscribers =        case lists:member(Pid, OldSubscribers) of            true  ->                OldSubscribers;            false ->                link(Pid),                All = ets:match_object(S#state.dict_tab, '_'),                lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All),                [Pid | OldSubscribers]        end,    do_multicast(NewSubscribers, Msg),    ets:insert(S#state.dict_tab, {Key, Val}),    S#state{subscribers = NewSubscribers};do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->    do_multicast(S#state.subscribers, Msg),    ets:insert(S#state.dict_tab, {Key, Val}),    S.do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->    OldSubscribers = S#state.subscribers,    do_multicast(OldSubscribers, Msg),    ets:delete(S#state.dict_tab, Key),    case lists:member(Pid, OldSubscribers) of	true  ->	    unlink(Pid),	    S#state{subscribers = OldSubscribers -- [Pid]};	false ->	    S    end;do_dict_delete({dict_delete, {filter, collector}}, S) ->    S;do_dict_delete(Msg = {dict_delete, Key}, S) ->    do_multicast(S#state.subscribers, Msg),    ets:delete(S#state.dict_tab, Key),    S.tab_iterate(_Fun, _Tab, '$end_of_table', Acc) ->    Acc;tab_iterate(Fun, Tab, Key, Acc) ->    Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)),    tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2).file_open(F) ->    Fd = make_ref(),    case F#file.file_opt of        write  -> file:rename(F#file.name, F#file.name ++ ".OLD");        append -> ignore    end,    Args = [{file, F#file.name}, {name, Fd},            {repair, true}, {mode, read_write}],    case disk_log:open(Args) of        {ok, _} ->            {ok, Fd};        {repaired, _, _, BadBytes} ->            ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~p~n",                                     [?MODULE, BadBytes, F#file.name]),            {ok, Fd};        {error,Reason} ->            {error,Reason}    end.parse_file_options(F, [H | T]) ->    case H of        existing -> parse_file_options(F#file{event_opt = existing} , T);        %%new      -> parse_file_options(F#file{event_opt = new} , T);        all      -> parse_file_options(F#file{event_opt = all} , T);        write    -> parse_file_options(F#file{file_opt  = write} , T);        append   -> parse_file_options(F#file{file_opt  = append} , T);        keep     -> parse_file_options(F#file{table_opt = keep} , T);        clear    -> parse_file_options(F#file{table_opt = clear} , T);        Bad      -> {error, {bad_file_option, Bad}}    end;parse_file_options(F, []) ->    {ok, F}.do_multicast([Pid | Pids], Msg) ->    Pid ! {et, Msg},    do_multicast(Pids, Msg);do_multicast([], _Msg) ->    ok.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -