📄 megaco_messenger.erl
字号:
%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%%%% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%%%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%%%% $Id$%%%%----------------------------------------------------------------------%% Purpose: Send and process a (sequence of) Megaco/H.248 transactions%%-----------------------------------------------------------------------module(megaco_messenger).%% Application internal export-export([ process_received_message/4, receive_message/4, connect/4, disconnect/2, encode_actions/3, call/3, cast/3, cancel/2, request_timeout/2, pending_timeout/3, reply_timeout/3, test_request/5, test_reply/5 ]).%% MIB stat functions-export([ get_stats/0, get_stats/1, get_stats/2, reset_stats/0, reset_stats/1 ]).%% Misc functions-export([ cleanup/2, which_requests/1, which_replies/1 ]).%% Module internal export-export([ process_received_message/5, handle_request/1, handle_long_request/1, connect_remote/3, disconnect_local/2, disconnect_remote/3, send_request_remote/4, receive_reply_remote/2 ]).-include_lib("megaco/include/megaco.hrl").-include("megaco_message_internal.hrl").-include_lib("megaco/src/app/megaco_internal.hrl").%% N.B. Update cancel/1 with '_' when a new field is added-record(request, {trans_id, remote_mid, timer_ref, % {short, Ref} | {long, Ref} init_timer, init_long_timer, curr_timer, version, bytes, % {send, Data} | {no_send, Data}, Data = binary() | tuple() send_handle, user_mod, user_args, reply_action, % call | cast reply_data }).%% N.B. Update cancel/1 with '_' when a new field is added-record(reply, { trans_id, local_mid, state = prepare, % prepare | eval_request | waiting_for_ack | aborted pending_timer_ref, handler = undefined, % pid of the proc executing the callback func timer_ref, version, bytes, ack_action, % discard_ack | {handle_ack, Data} send_handle }).-record(trans_id, { mid, serial }).-ifdef(MEGACO_TEST_CODE).-define(SIM(Other,Where), fun(Afun,Bfun) -> Kfun = {?MODULE,Bfun}, case (catch ets:lookup(megaco_test_data, Kfun)) of [{Kfun,Cfun}] -> Cfun(Afun); _ -> Afun end end(Other,Where)).-define(TC_AWAIT_CANCEL_EVENT(), case megaco_tc_controller:lookup(block_on_cancel) of {value, {Tag, Pid}} when is_pid(Pid) -> Pid ! {Tag, self()}, receive {Tag, Pid} -> ok end; {value, {sleep, To}} when is_integer(To) and (To > 0) -> receive after To -> ok end; _ -> ok end).-define(TC_AWAIT_REPLY_EVENT(Info), case megaco_tc_controller:lookup(block_on_reply) of {value, {Tag, Pid}} when is_pid(Pid) -> Pid ! {Tag, self(), Info}, receive {Tag, Pid} -> ok end; _Whatever -> %% io:format("Whatever: ~p~n", [Whatever]), ok end).-else.-define(SIM(Other,Where),Other).-define(TC_AWAIT_CANCEL_EVENT(),ok).-define(TC_AWAIT_REPLY_EVENT(_),ok).-endif.-define(report_pending_limit_exceeded(ConnData), ?report_important(ConnData, "<ERROR> pending limit exceeded", [])).-ifdef(megaco_extended_trace).-define(rt1(T,F,A),?report_trace(T,F,A)).-define(rt2(F,A), ?rt1(ignore,F,A)).-define(rt3(F), ?rt2(F,[])).-else.-define(rt1(T,F,A),ok).-define(rt2(F,A), ok).-define(rt3(F), ok).-endif.%%----------------------------------------------------------------------%% SNMP statistics handling functions%%----------------------------------------------------------------------%%-----------------------------------------------------------------%% Func: get_stats/0, get_stats/1, get_stats/2%% Description: Retreive statistics (counters) for TCP%%-----------------------------------------------------------------get_stats() -> megaco_stats:get_stats(megaco_stats).get_stats(ConnHandleOrCounter) -> megaco_stats:get_stats(megaco_stats, ConnHandleOrCounter).get_stats(ConnHandle, Counter) -> megaco_stats:get_stats(megaco_stats, ConnHandle, Counter).%%-----------------------------------------------------------------%% Func: reset_stats/0, reaet_stats/1%% Description: Reset statistics (counters)%%-----------------------------------------------------------------reset_stats() -> megaco_stats:reset_stats(megaco_stats).reset_stats(ConnHandleOrCounter) -> megaco_stats:reset_stats(megaco_stats, ConnHandleOrCounter).%%----------------------------------------------------------------------%% cleanup utility functions%%----------------------------------------------------------------------cleanup(#megaco_conn_handle{local_mid = LocalMid}, Force) when (Force == true) or (Force == false) -> Pat = #reply{trans_id = '$1', local_mid = LocalMid, state = '$2', _ = '_'}, do_cleanup(Pat, Force);cleanup(LocalMid, Force) when (Force == true) or (Force == false) -> Pat = #reply{trans_id = '$1', local_mid = LocalMid, state = '$2', _ = '_'}, do_cleanup(Pat, Force). do_cleanup(Pat, Force) -> Match = megaco_monitor:which_replies(Pat), Reps = [{V1, V2} || [V1, V2] <- Match], do_cleanup2(Reps, Force).do_cleanup2([], _) -> ok;do_cleanup2([{TransId, aborted}|T], Force = false) -> megaco_monitor:delete_reply(TransId), do_cleanup2(T, Force);do_cleanup2([_|T], Force = false) -> do_cleanup2(T, Force);do_cleanup2([{TransId, _State}|T], Force = true) -> megaco_monitor:delete_reply(TransId), do_cleanup2(T, Force). %%----------------------------------------------------------------------%% which_requests and which_replies utility functions%%----------------------------------------------------------------------which_requests(#megaco_conn_handle{local_mid = LocalMid, remote_mid = RemoteMid}) -> Pat1 = #trans_id{mid = LocalMid, serial = '$1', _ = '_'}, Pat2 = #request{trans_id = Pat1, remote_mid = RemoteMid, _ = '_'}, Match = megaco_monitor:which_requests(Pat2), [S || [S] <- Match];which_requests(LocalMid) -> Pat1 = #trans_id{mid = LocalMid, serial = '$1', _ = '_'}, Pat2 = #request{trans_id = Pat1, remote_mid = '$2', _ = '_'}, Match0 = megaco_monitor:which_requests(Pat2), Match1 = [{mk_ch(LocalMid, V2), V1} || [V1, V2] <- Match0], which_requests1(lists:sort(Match1)).which_requests1([]) -> [];which_requests1([{CH, S}|T]) -> which_requests2(T, CH, [S], []).which_requests2([], CH, Serials, Reqs) -> lists:reverse([{CH, Serials}|Reqs]);which_requests2([{CH, S}|T], CH, Serials, Reqs) -> which_requests2(T, CH, [S|Serials], Reqs);which_requests2([{CH1, S}|T], CH2, Serials, Reqs) -> which_requests2(T, CH1, [S], [{CH2, lists:reverse(Serials)}| Reqs]). which_replies(#megaco_conn_handle{local_mid = LocalMid, remote_mid = RemoteMid}) -> Pat1 = #trans_id{mid = RemoteMid, serial = '$1', _ = '_'}, Pat2 = #reply{trans_id = Pat1, local_mid = LocalMid, state = '$2', handler = '$3', _ = '_'}, Match = megaco_monitor:which_replies(Pat2), [{V1, V2, V3} || [V1, V2, V3] <- Match];which_replies(LocalMid) -> Pat1 = #trans_id{mid = '$1', serial = '$2', _ = '_'}, Pat2 = #reply{trans_id = Pat1, local_mid = LocalMid, state = '$3', handler = '$4', _ = '_'}, Match0 = megaco_monitor:which_replies(Pat2), Match1 = [{mk_ch(LocalMid,V1),{V2,V3,V4}} || [V1, V2, V3, V4] <- Match0], which_replies1(lists:sort(Match1)).which_replies1([]) -> [];which_replies1([{CH, Data}|T]) -> which_replies2(T, CH, [Data], []).which_replies2([], CH, Data, Reps) -> lists:reverse([{CH, Data}|Reps]);which_replies2([{CH, Data}|T], CH, Datas, Reps) -> which_replies2(T, CH, [Data|Datas], Reps);which_replies2([{CH1, Data}|T], CH2, Datas, Reps) -> which_replies2(T, CH1, [Data], [{CH2, lists:reverse(Datas)}| Reps]). mk_ch(LM, RM) -> #megaco_conn_handle{local_mid = LM, remote_mid = RM}. %%----------------------------------------------------------------------%% Register/unreister connections%%----------------------------------------------------------------------%% Returns {ok, ConnHandle} | {error, Reason}connect(RH, RemoteMid, SendHandle, ControlPid) when is_record(RH, megaco_receive_handle) -> case megaco_config:connect(RH, RemoteMid, SendHandle, ControlPid) of {ok, ConnData} -> do_connect(ConnData); {error, Reason} -> {error, Reason} end;connect(BadHandle, _CH, _SendHandle, _ControlPid) -> {error, {bad_receive_handle, BadHandle}}.do_connect(CD) -> CH = CD#conn_data.conn_handle, Version = CD#conn_data.protocol_version, UserMod = CD#conn_data.user_mod, UserArgs = CD#conn_data.user_args, ?report_trace(CD, "callback: connect", []), Res = (catch apply(UserMod, handle_connect, [CH, Version | UserArgs])), ?report_debug(CD, "return: connect", [{return, Res}]), case Res of ok -> ?SIM(ok, do_encode), monitor_process(CH, CD#conn_data.control_pid); error -> megaco_config:disconnect(CH), {error, {connection_refused, CD, error}}; {error, ED} when is_record(ED,'ErrorDescriptor') -> megaco_config:disconnect(CH), {error, {connection_refused, CD, ED}}; _Error -> warning_msg("connect callback failed: ~w", [Res]), megaco_config:disconnect(CH), {error, {connection_refused, CD, Res}} end.monitor_process(CH, ControlPid) when node(ControlPid) == node() -> M = ?MODULE, F = disconnect_local, A = [CH], Ref = megaco_monitor:apply_at_exit(M, F, A, ControlPid), case megaco_config:update_conn_info(CH, monitor_ref, Ref) of ok -> ?SIM({ok, CH}, monitor_process_local); {error, Reason} -> disconnect(CH, {config_update, Reason}), {error, Reason} end;monitor_process(CH, ControlPid) when node(ControlPid) /= node() -> RemoteNode = node(ControlPid), UserMonitorPid = whereis(megaco_monitor), Args = [CH, ControlPid, UserMonitorPid], case rpc:call(RemoteNode, ?MODULE, connect_remote, Args) of
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -