📄 megaco_messenger.erl
字号:
{ok, ControlMonitorPid} -> M = ?MODULE, F = disconnect_local, A = [CH], Ref = megaco_monitor:apply_at_exit(M, F, A, ControlMonitorPid), case megaco_config:update_conn_info(CH, monitor_ref, Ref) of ok -> ?SIM({ok, CH}, monitor_process_remote); {error, Reason} -> disconnect(CH, {config_update, Reason}), {error, Reason} end; {error, Reason} -> disconnect(CH, {connect_remote, Reason}), {error, Reason}; {badrpc, Reason} -> Reason2 = {'EXIT', Reason}, disconnect(CH, {connect_remote, Reason2}), {error, Reason2} end.connect_remote(CH, ControlPid, UserMonitorPid) when node(ControlPid) == node(), node(UserMonitorPid) /= node() -> case megaco_config:lookup_local_conn(CH) of [_ConnData] -> UserNode = node(UserMonitorPid), M = ?MODULE, F = disconnect_remote, A = [CH, UserNode], Ref = megaco_monitor:apply_at_exit(M, F, A, UserMonitorPid), case megaco_config:connect_remote(CH, UserNode, Ref) of ok -> ControlMonitorPid = whereis(megaco_monitor), ?SIM({ok, ControlMonitorPid}, connect_remote); {error, Reason} -> {error, Reason} end; [] -> {error, {no_connection, CH}} end.disconnect(ConnHandle, DiscoReason) when is_record(ConnHandle, megaco_conn_handle) -> case megaco_config:disconnect(ConnHandle) of {ok, ConnData, RemoteConnData} -> ControlRef = ConnData#conn_data.monitor_ref, megaco_monitor:cancel_apply_at_exit(ControlRef), handle_disconnect_callback(ConnData, DiscoReason), ControlNode = node(ConnData#conn_data.control_pid), case ControlNode == node() of true -> %% Propagate to remote users CancelFun = fun(RCD) -> UserRef = RCD#remote_conn_data.monitor_ref, megaco_monitor:cancel_apply_at_exit(UserRef), RCD#remote_conn_data.user_node end, Nodes = lists:map(CancelFun, RemoteConnData), %% io:format("NODES: ~p~n", [Nodes]), M = ?MODULE, F = disconnect, A = [ConnHandle, DiscoReason], case rpc:multicall(Nodes, M, F, A) of {Res, []} -> Check = fun(ok) -> false; ({error, {no_connection, _CH}}) -> false; (_) -> true end, case lists:filter(Check, Res) of [] -> ok; Bad -> {error, {remote_disconnect_error, ConnHandle, Bad}} end; {_Res, Bad} -> {error, {remote_disconnect_crash, ConnHandle, Bad}} end; false when RemoteConnData == [] -> %% Propagate to remote control node M = ?MODULE, F = disconnect_remote, A = [DiscoReason, ConnHandle, node()], case rpc:call(ControlNode, M, F, A) of {badrpc, Reason} -> {error, {'EXIT', Reason}}; Other -> Other end end; {error, Reason} -> {error, Reason} end;disconnect(BadHandle, Reason) -> {error, {bad_conn_handle, BadHandle, Reason}}.disconnect_local(Reason, ConnHandle) -> disconnect(ConnHandle, {no_controlling_process, Reason}).disconnect_remote(_Reason, ConnHandle, UserNode) -> case megaco_config:disconnect_remote(ConnHandle, UserNode) of [RCD] -> Ref = RCD#remote_conn_data.monitor_ref, megaco_monitor:cancel_apply_at_exit(Ref), ok; [] -> {error, {no_connection, ConnHandle}} end.%%----------------------------------------------------------------------%% Handle incoming message%%----------------------------------------------------------------------receive_message(ReceiveHandle, ControlPid, SendHandle, Bin) -> Opts = [link , {min_heap_size, 5000}], spawn_opt(?MODULE, process_received_message, [ReceiveHandle, ControlPid, SendHandle, Bin, self()], Opts), ok.%% This function is called via the spawn_opt function with the link%% option, therefor the unlink before the exit.process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin, Receiver) -> process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin), unlink(Receiver), exit(normal).process_received_message(ReceiveHandle, ControlPid, SendHandle, Bin) -> Flag = process_flag(trap_exit, true), case prepare_message(ReceiveHandle, SendHandle, Bin, ControlPid) of {ok, ConnData, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') -> ?rt1(ConnData, "message prepared", [MegaMsg]), Mess = MegaMsg#'MegacoMessage'.mess, case Mess#'Message'.messageBody of {transactions, Transactions} -> {AckList, ReqList} = prepare_trans(ConnData, Transactions, [], []), handle_acks(AckList), case ReqList of [] -> ?rt3("no requests"), ignore; [Req|Reqs] when (ConnData#conn_data.threaded == true) -> [spawn(?MODULE,handle_request,[R]) || R <- Reqs], handle_request(Req); _ -> ?rt3("handle requests"), case handle_requests(ReqList, []) of [] -> ignore; [LongRequest | More] -> [spawn(?MODULE, handle_long_request, [LR]) || LR <- More], handle_long_request(LongRequest) end end; {messageError, Error} -> handle_message_error(ConnData, Error) end; {silent_fail, ConnData, {_Code, Reason, Error}} -> ?report_debug(ConnData, Reason, [no_reply, Error]), ignore; {verbose_fail, ConnData, {Code, Reason, Error}} -> ?report_debug(ConnData, Reason, [Error]), send_message_error(ConnData, Code, Reason) end, process_flag(trap_exit, Flag), ok.prepare_message(RH, SH, Bin, Pid) when is_record(RH, megaco_receive_handle) and is_pid(Pid) -> ?report_trace(RH, "receive bytes", [{bytes, Bin}]), EncodingMod = RH#megaco_receive_handle.encoding_mod, EncodingConfig = RH#megaco_receive_handle.encoding_config, ProtVersion = RH#megaco_receive_handle.protocol_version, case (catch EncodingMod:decode_message(EncodingConfig, ProtVersion, Bin)) of {ok, MegaMsg} when is_record(MegaMsg, 'MegacoMessage') -> ?report_trace(RH, "receive message", [{message, MegaMsg}]), Mess = MegaMsg#'MegacoMessage'.mess, RemoteMid = Mess#'Message'.mId, Version = Mess#'Message'.version, LocalMid = RH#megaco_receive_handle.local_mid, CH = #megaco_conn_handle{local_mid = LocalMid, remote_mid = RemoteMid}, case megaco_config:lookup_local_conn(CH) of %% %% Message is not of the negotiated version %% [#conn_data{protocol_version = NegVersion, strict_version = true} = ConnData] when NegVersion /= Version -> %% Use already established connection, %% but incorrect version ?rt1(ConnData, "not negotiated version", [Version]), Error = {error, {not_negotiated_version, NegVersion, Version}}, handle_syntax_error_callback(RH, ConnData, prepare_error(Error)); [ConnData] -> %% Use already established connection ?rt1(ConnData, "use already established connection", []), ConnData2 = ConnData#conn_data{send_handle = SH, protocol_version = Version}, check_message_auth(CH, ConnData2, MegaMsg, Bin); [] -> %% Setup a temporary connection ?rt3("setup a temporary connection"), case connect(RH, RemoteMid, SH, Pid) of {ok, _} -> do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin); {error, {already_connected, _ConnHandle}} -> do_prepare_message(RH, CH, SH, MegaMsg, Pid, Bin); {error, {connection_refused, ConnData, Reason}} -> Error = prepare_error({error, {connection_refused, Reason}}), {verbose_fail, ConnData, Error}; {error, Reason} -> ConnData = fake_conn_data(RH, RemoteMid, SH, Pid), ConnData2 = ConnData#conn_data{protocol_version = Version}, Error = prepare_error({error, Reason}), {verbose_fail, ConnData2, Error} end end; Error -> ?rt2("decode error", [Error]), ConnData = handle_decode_error(Error, RH, SH, Bin, Pid, EncodingMod, EncodingConfig, ProtVersion), handle_syntax_error_callback(RH, ConnData, prepare_error(Error)) end;prepare_message(RH, SendHandle, _Bin, ControlPid) -> ConnData = fake_conn_data(RH, SendHandle, ControlPid), Error = prepare_error({'EXIT', {bad_receive_handle, RH}}), {verbose_fail, ConnData, Error}.handle_decode_error({error, {unsupported_version, _}}, #megaco_receive_handle{local_mid = LocalMid} = RH, SH, Bin, Pid, EM, EC, V) -> case (catch EM:decode_mini_message(EC, V, Bin)) of {ok, #'MegacoMessage'{mess = #'Message'{version = _Ver, mId = RemoteMid}}} -> ?rt2("erroneous message received", [SH, RemoteMid, _Ver]), CH = #megaco_conn_handle{local_mid = LocalMid, remote_mid = RemoteMid}, incNumErrors(CH), %% We cannot put the version into conn-data, that will %% make the resulting error message impossible to sent %% (unsupported version) case megaco_config:lookup_local_conn(CH) of [ConnData] -> ?rt3("known to us"), ConnData#conn_data{send_handle = SH}; [] -> ?rt3("unknown to us"), ConnData = fake_conn_data(RH, SH, Pid), ConnData#conn_data{conn_handle = CH} end; _ -> ?rt2("erroneous message received", [SH]), incNumErrors(), fake_conn_data(RH, SH, Pid) end;handle_decode_error(_, #megaco_receive_handle{local_mid = LocalMid} = RH, SH, Bin, Pid, EM, EC, V) -> case (catch EM:decode_mini_message(EC, V, Bin)) of {ok, #'MegacoMessage'{mess = #'Message'{version = Ver, mId = RemoteMid}}} -> ?rt2("erroneous message received", [SH, Ver, RemoteMid]), CH = #megaco_conn_handle{local_mid = LocalMid, remote_mid = RemoteMid}, incNumErrors(CH), case megaco_config:lookup_local_conn(CH) of [ConnData] -> ?rt3("known to us"), ConnData#conn_data{send_handle = SH, protocol_version = Ver}; [] -> ?rt3("unknown to us"), ConnData = fake_conn_data(RH, SH, Pid), ConnData#conn_data{conn_handle = CH, protocol_version = Ver} end; _ -> ?rt2("erroneous message received", [SH]), incNumErrors(), fake_conn_data(RH, SH, Pid) end.do_prepare_message(RH, CH, SendHandle, MegaMsg, ControlPid, Bin) -> case megaco_config:lookup_local_conn(CH) of [ConnData] -> case check_message_auth(CH, ConnData, MegaMsg, Bin) of {ok, ConnData2, MegaMsg} -> %% Let the connection be permanent {ok, ConnData2, MegaMsg}; {ReplyTag, ConnData, Reason} -> %% Remove the temporary connection disconnect(CH, {bad_auth, Reason}), {ReplyTag, ConnData, Reason} end; [] -> Reason = no_connection, disconnect(CH, Reason), RemoteMid = CH#megaco_conn_handle.remote_mid, ConnData = fake_conn_data(RH, RemoteMid, SendHandle, ControlPid), Error = prepare_error({error, Reason}), {silent_fail, ConnData, Error} end.check_message_auth(_ConnHandle, ConnData, MegaMsg, Bin) -> MsgAuth = MegaMsg#'MegacoMessage'.authHeader, Mess = MegaMsg#'MegacoMessage'.mess, Version = Mess#'Message'.version, ConnData2 = ConnData#conn_data{protocol_version = Version}, ConnAuth = ConnData2#conn_data.auth_data, ?report_trace(ConnData2, "check message auth", [{bytes, Bin}]), if MsgAuth == asn1_NOVALUE, ConnAuth == asn1_NOVALUE -> ?SIM({ok, ConnData2, MegaMsg}, check_message_auth); true -> ED = #'ErrorDescriptor'{errorCode = ?megaco_unauthorized, errorText = "Autentication is not supported"}, {verbose_fail, ConnData2, prepare_error({error, ED})} end.handle_syntax_error_callback(ReceiveHandle, ConnData, PrepError) -> {Code, Reason, Error} = PrepError, ErrorDesc = #'ErrorDescriptor'{errorCode = Code, errorText = Reason}, Version = case Error of {error, {unsupported_version, UV}} -> UV; _ -> ConnData#conn_data.protocol_version end, UserMod = ConnData#conn_data.user_mod, UserArgs = ConnData#conn_data.user_args, ?report_trace(ReceiveHandle, "callback: syntax error", [ErrorDesc, Error]), Res = (catch apply(UserMod, handle_syntax_error, [ReceiveHandle, Version, ErrorDesc | UserArgs])), ?report_debug(ReceiveHandle, "return: syntax error", [{return, Res}, ErrorDesc, Error]), case Res of reply -> {verbose_fail, ConnData, PrepError}; {reply,#'ErrorDescriptor'{errorCode = Code1, errorText = Reason1}} -> {verbose_fail, ConnData, {Code1,Reason1,Error}}; no_reply -> {silent_fail, ConnData, PrepError}; {no_reply,#'ErrorDescriptor'{errorCode=Code2,errorText=Reason2}} -> {silent_fail, ConnData, {Code2,Reason2,Error}}; %%% OTP-???? _ -> warning_msg("syntax error callback failed: ~w", [Res]), {verbose_fail, ConnData, PrepError}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -