⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 megaco_messenger.erl

📁 OTP是开放电信平台的简称
💻 ERL
📖 第 1 页 / 共 5 页
字号:
            case megaco_messenger_misc:send_message(ConnData2, true, Bin) of		{ok, _} ->		    ok;		{error, Reason} ->		    %% Pass it on to the user (via handle_ack)		    cancel_reply(ConnData2, Rep, Reason)	    end,	    prepare_trans(ConnData2, Rest, AckList, ReqList);	[#reply{state = aborted} = Rep] ->	    ?rt3("request resend when already in aborted state"),	    %% OTP-4956:	    %% Already aborted so ignore.	    %% This furthermore means that the abnoxious user at the	    %% other end has already been informed (pending-limit	    %% passed => error descriptor sent), but keeps sending...	    %% 	    %% Shall we perform a cleanup?	    cancel_reply(ConnData, Rep, aborted),	    prepare_trans(ConnData, Rest, AckList, ReqList)        end.prepare_ack(ConnData, [TA | T], Rest, AckList, ReqList)   when is_record(TA, 'TransactionAck') ->    First     = TA#'TransactionAck'.firstAck,    Last      = TA#'TransactionAck'.lastAck,    TA2       = TA#'TransactionAck'{lastAck = asn1_NOVALUE},    ConnData2 = ConnData#conn_data{serial = First},    AckList2  = do_prepare_ack(ConnData2, TA2, AckList),    if        Last == asn1_NOVALUE ->            prepare_ack(ConnData, T, Rest, AckList2, ReqList);        First < Last ->            TA3 = TA#'TransactionAck'{firstAck = First + 1},            prepare_ack(ConnData, [TA3 | T], Rest, AckList2, ReqList);        First == Last ->            prepare_ack(ConnData, T, Rest, AckList2, ReqList);        First > Last ->            %% Protocol violation from the sender of this ack            ?report_important(ConnData, "<ERROR> discard trans",			      [TA, {error, "firstAck > lastAck"}]),	    prepare_ack(ConnData, T, Rest, AckList2, ReqList)    end;prepare_ack(ConnData, [], Rest, AckList, ReqList) ->    prepare_trans(ConnData, Rest, AckList, ReqList).do_prepare_ack(ConnData, T, AckList) ->    TransId = to_remote_trans_id(ConnData),    case megaco_monitor:lookup_reply(TransId) of        [] ->            %% The reply has already been garbage collected. Ignore.            ?report_trace(ConnData, "discard ack (no receiver)", [T]),            AckList;        [Rep] when Rep#reply.state == waiting_for_ack ->            %% Don't care about Msg and Rep version diff            [{ConnData, Rep, T} | AckList];        [_Rep] ->            %% Protocol violation from the sender of this ack            ?report_important(ConnData, "<ERROR> discard trans",			      [T, {error, "got ack before reply was sent"}]),            AckList    end.check_and_maybe_create_pending_limit(infinity, _, _) ->    ok;check_and_maybe_create_pending_limit(Limit, Direction, TransId) ->    case (catch megaco_config:get_pending_counter(Direction, TransId)) of	{'EXIT', _} ->	    %% Has not been created yet (connect).	    megaco_config:cre_pending_counter(Direction, TransId, 0),	    ok;	Val when Val =< Limit ->	    %% Since we have no intention to increment here, it	    %% is ok to be _at_ the limit	    ok;	_ ->	    aborted    end.check_pending_limit(infinity, _, _) ->    {ok, 0};check_pending_limit(Limit, Direction, TransId) ->    ?rt2("check pending limit", [Direction, Limit, TransId]),    case (catch megaco_config:get_pending_counter(Direction, TransId)) of	{'EXIT', _} ->	    %% This function is only called when we "know" the 	    %% counter to exist. So, the only reason that this 	    %% would happen is of the counter has been removed.	    %% This only happen if the pending limit has been 	    %% reached. In any case, this is basically the same 	    %% as aborted!	    ?rt2("check pending limit - exit", []),	    aborted;	Val when Val =< Limit ->	    %% Since we have no intention to increment here, it	    %% is ok to be _at_ the limit	    ?rt2("check pending limit - ok", [Val]),	    {ok, Val};	_Val ->	    ?rt2("check pending limit - aborted", [_Val]),	    aborted    end.check_and_maybe_incr_pending_limit(infinity, _, _) ->    ok;check_and_maybe_incr_pending_limit(Limit, Direction, TransId) ->    %%     %% We need this kind of test to detect when we _pass_ the limit    %%     ?rt2("check and maybe incr pending limit", [Direction, Limit, TransId]),    case (catch megaco_config:get_pending_counter(Direction, TransId)) of	{'EXIT', _} ->	    %% Has not been created yet (connect).	    megaco_config:cre_pending_counter(Direction, TransId, 1),	    ok;	Val when Val > Limit ->	    ?rt2("check and maybe incr - aborted", [Direction, Val, Limit]),	    aborted;      % Already passed the limit	Val ->	    ?rt2("check and maybe incr - incr", [Direction, Val, Limit]),	    megaco_config:incr_pending_counter(Direction, TransId),	    if 		Val < Limit ->		    ok;   % Still within the limit		true ->		    ?rt2("check and maybe incr - error", 			 [Direction, Val, Limit]),		    error % Passed the limit	    end    end.%% BUGBUG BUGBUG BUGBUG%% %% Do we know that the Rep is still valid? A previous transaction%% could have taken a lot of time.%% handle_request({ConnData, TransId, T}) ->    case handle_request(ConnData, TransId, T) of	{pending, _RequestData} ->	    handle_long_request(ConnData, TransId, T);	Else ->	    Else    end.handle_request(ConnData, TransId, T) ->    ?report_trace(ConnData, "handle request", [TransId, T]),    %% Pending limit:    %% Ok, before we begin, lets check that this request     %% has not been aborted. I.e. exceeded the pending     %% limit, so go check it...    #conn_data{sent_pending_limit = Limit} = ConnData,    case check_and_maybe_create_pending_limit(Limit, sent, TransId) of	ok ->	    %% Ok so far, now update state	    case megaco_monitor:lookup_reply(TransId) of		[Rep] when is_record(Rep, reply) ->		    Rep2 = Rep#reply{state = eval_request},		    megaco_monitor:insert_reply(Rep2),		    		    Actions = T#'TransactionRequest'.actions,		    {AckAction, SendReply} = 			handle_request_callback(ConnData, TransId, Actions, T),		    %% Next step, while we where in the callback function,		    %% the pending limit could have been exceeded, so check		    %% it again...		    do_handle_request(AckAction, SendReply, 				      ConnData, TransId);		_ ->		    %% Ugh?		    ignore	    end;	aborted ->	    %% Pending limit	    %% Already exceeded the limit	    %% The user does not yet know about this request, so	    %% don't bother telling that it has been aborted...	    %% Furthermore, the reply timer has not been started,	    %% so do the cleanup now	    ?rt1(ConnData, "pending limit already passed", [TransId]),	    case megaco_monitor:lookup_reply(TransId) of		[Rep] ->		    cancel_reply(ConnData, Rep, aborted);		_ ->		    ok	    end,	    ignore    end.do_handle_request(_, ignore, _ConnData, _TransId) ->    ?rt1(_ConnData, "ignore: don't reply", [_TransId]),    ignore;do_handle_request(_, ignore_trans_request, ConnData, TransId) ->    ?rt1(ConnData, "ignore trans request: don't reply", [TransId]),    case megaco_monitor:lookup_reply(TransId) of	[#reply{} = Rep] ->	    cancel_reply(ConnData, Rep, ignore);	_ ->	    ignore    end;do_handle_request({pending, _RequestData}, {aborted, ignore}, _, _) ->    ?rt2("handle request: pending - aborted - ignore => don't reply", []),    ignore;do_handle_request({pending, _RequestData}, {aborted, _SendReply}, _, _) ->    ?rt2("handle request: pending - aborted => don't reply", []),    ignore;do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) ->    ?rt2("handle request: pending", [RequestData]),    {pending, RequestData};do_handle_request(AckAction, {ok, Bin}, ConnData, TransId) ->    ?rt1(ConnData, "handle request - ok", [AckAction, TransId]),    case megaco_monitor:lookup_reply(TransId) of	[#reply{pending_timer_ref = PendingRef} = Rep] ->%% 	    d("do_handle_request -> found reply record:"%% 	      "~n   Rep: ~p", [Rep]),	    	    #conn_data{reply_timer = InitTimer,		       conn_handle = ConnHandle} = ConnData,	    %% Pending limit update:	    %%   - Cancel the pending timer, if running	    %%   - Delete the pending counter	    %% 	    megaco_monitor:cancel_apply_after(PendingRef),	    megaco_config:del_pending_counter(sent, TransId),	    Method = timer_method(AckAction),	    {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),	    OptBin = opt_garb_binary(CurrTimer, Bin),	    M = ?MODULE,	    F = reply_timeout,	    A = [ConnHandle, TransId, CurrTimer],	    Ref = megaco_monitor:apply_after(Method, M, F, A, 					     WaitFor),	    Rep2 = Rep#reply{pending_timer_ref = undefined,			     handler           = undefined,			     bytes             = OptBin,			     state             = waiting_for_ack,			     timer_ref         = Ref,			     ack_action        = AckAction},	    megaco_monitor:insert_reply(Rep2), % Timing problem?%% 	    d("do_handle_request -> "%% 	      "~n   Rep2: ~p", [Rep2]),	    ignore;	_ ->	    %% Been removed already?	    ignore    end;do_handle_request(_, {error, aborted}, ConnData, TransId) ->    ?report_trace(ConnData, "aborted during our absence", [TransId]),    case megaco_monitor:lookup_reply(TransId) of	[Rep] ->	    cancel_reply(ConnData, Rep, aborted);	_ ->	    ok    end,    ignore;do_handle_request(AckAction, {error, Reason}, ConnData, TransId) ->    ?report_trace(ConnData, "error", [TransId, Reason]),    case megaco_monitor:lookup_reply(TransId) of	[Rep] ->	    Rep2 = Rep#reply{state      = waiting_for_ack,			     ack_action = AckAction},	    cancel_reply(ConnData, Rep2, Reason);	_ ->	    ok    end,    ignore;do_handle_request(AckAction, SendReply, ConnData, TransId) ->    ?report_trace(ConnData, "unknown send trans reply result", 		  [TransId, AckAction, SendReply]),    ignore.    handle_requests([{ConnData, TransId, T} | Rest], Pending) ->    ?rt2("handle requests", [TransId]),    case handle_request(ConnData, TransId, T) of	{pending, RequestData} ->	    handle_requests(Rest, [{ConnData,TransId,RequestData} | Pending]);	_ ->	    handle_requests(Rest, Pending)    end;handle_requests([], Pending) ->    ?rt2("handle requests - done", [Pending]),    Pending.%% opt_garb_binary(timeout, _Bin) -> garb_binary; % Need msg at restart of timeropt_garb_binary(_Timer,   Bin) -> Bin.timer_method(discard_ack) ->    apply_method;timer_method(_) ->    spawn_method.handle_long_request({ConnData, TransId, RequestData}) ->    ?rt2("handle long request", [TransId, RequestData]),    %% Pending limit:    %% We need to check the pending limit, in case it was    %% exceeded before we got this far...    %% We dont need to be able to create the counter here,    %% since that was done in the handle_request function.    #conn_data{sent_pending_limit = Limit} = ConnData,    case check_pending_limit(Limit, sent, TransId) of	{ok, _} ->	    handle_long_request(ConnData, TransId, RequestData);	_ ->	    %% Already exceeded the limit	    ignore    end.handle_long_request(ConnData, TransId, RequestData) ->    ?report_trace(ConnData, "callback: trans long request",		  [TransId, {request_data, RequestData}]),        case megaco_monitor:lookup_reply(TransId) of	[Rep] when is_record(Rep, reply) ->	    %% Update (possibly) new handler	    megaco_monitor:insert_reply(Rep#reply{handler = self()}),	    {AckAction, Res} = 		handle_long_request_callback(ConnData, TransId, RequestData),	    do_handle_long_request(AckAction, Res, ConnData, TransId);	 _ ->	    %% Been removed already?	    ignore    end.do_handle_long_request(AckAction, {ok, Bin}, ConnData, TransId) ->    case megaco_monitor:lookup_reply(TransId) of	[Rep] when is_record(Rep, reply) ->	    Method = timer_method(AckAction),	    InitTimer = ConnData#conn_data.reply_timer,	    {WaitFor, CurrTimer} = megaco_timer:init(InitTimer),	    OptBin = opt_garb_binary(CurrTimer, Bin),	    ConnHandle = ConnData#conn_data.conn_handle,	    M = ?MODULE,	    F = reply_timeout,	    A = [ConnHandle, Rep#reply.trans_id, CurrTimer],	    Ref = megaco_monitor:apply_after(Method, 					     M, F, A, 					     WaitFor),	    Rep2 = Rep#reply{bytes      = OptBin,			     state      = waiting_for_ack,			     timer_ref  = Ref,			     ack_action = AckAction},	    megaco_monitor:insert_reply(Rep2); % Timing problem?	_ ->

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -