📄 megaco_messenger.erl
字号:
case megaco_messenger_misc:send_message(ConnData2, true, Bin) of {ok, _} -> ok; {error, Reason} -> %% Pass it on to the user (via handle_ack) cancel_reply(ConnData2, Rep, Reason) end, prepare_trans(ConnData2, Rest, AckList, ReqList); [#reply{state = aborted} = Rep] -> ?rt3("request resend when already in aborted state"), %% OTP-4956: %% Already aborted so ignore. %% This furthermore means that the abnoxious user at the %% other end has already been informed (pending-limit %% passed => error descriptor sent), but keeps sending... %% %% Shall we perform a cleanup? cancel_reply(ConnData, Rep, aborted), prepare_trans(ConnData, Rest, AckList, ReqList) end.prepare_ack(ConnData, [TA | T], Rest, AckList, ReqList) when is_record(TA, 'TransactionAck') -> First = TA#'TransactionAck'.firstAck, Last = TA#'TransactionAck'.lastAck, TA2 = TA#'TransactionAck'{lastAck = asn1_NOVALUE}, ConnData2 = ConnData#conn_data{serial = First}, AckList2 = do_prepare_ack(ConnData2, TA2, AckList), if Last == asn1_NOVALUE -> prepare_ack(ConnData, T, Rest, AckList2, ReqList); First < Last -> TA3 = TA#'TransactionAck'{firstAck = First + 1}, prepare_ack(ConnData, [TA3 | T], Rest, AckList2, ReqList); First == Last -> prepare_ack(ConnData, T, Rest, AckList2, ReqList); First > Last -> %% Protocol violation from the sender of this ack ?report_important(ConnData, "<ERROR> discard trans", [TA, {error, "firstAck > lastAck"}]), prepare_ack(ConnData, T, Rest, AckList2, ReqList) end;prepare_ack(ConnData, [], Rest, AckList, ReqList) -> prepare_trans(ConnData, Rest, AckList, ReqList).do_prepare_ack(ConnData, T, AckList) -> TransId = to_remote_trans_id(ConnData), case megaco_monitor:lookup_reply(TransId) of [] -> %% The reply has already been garbage collected. Ignore. ?report_trace(ConnData, "discard ack (no receiver)", [T]), AckList; [Rep] when Rep#reply.state == waiting_for_ack -> %% Don't care about Msg and Rep version diff [{ConnData, Rep, T} | AckList]; [_Rep] -> %% Protocol violation from the sender of this ack ?report_important(ConnData, "<ERROR> discard trans", [T, {error, "got ack before reply was sent"}]), AckList end.check_and_maybe_create_pending_limit(infinity, _, _) -> ok;check_and_maybe_create_pending_limit(Limit, Direction, TransId) -> case (catch megaco_config:get_pending_counter(Direction, TransId)) of {'EXIT', _} -> %% Has not been created yet (connect). megaco_config:cre_pending_counter(Direction, TransId, 0), ok; Val when Val =< Limit -> %% Since we have no intention to increment here, it %% is ok to be _at_ the limit ok; _ -> aborted end.check_pending_limit(infinity, _, _) -> {ok, 0};check_pending_limit(Limit, Direction, TransId) -> ?rt2("check pending limit", [Direction, Limit, TransId]), case (catch megaco_config:get_pending_counter(Direction, TransId)) of {'EXIT', _} -> %% This function is only called when we "know" the %% counter to exist. So, the only reason that this %% would happen is of the counter has been removed. %% This only happen if the pending limit has been %% reached. In any case, this is basically the same %% as aborted! ?rt2("check pending limit - exit", []), aborted; Val when Val =< Limit -> %% Since we have no intention to increment here, it %% is ok to be _at_ the limit ?rt2("check pending limit - ok", [Val]), {ok, Val}; _Val -> ?rt2("check pending limit - aborted", [_Val]), aborted end.check_and_maybe_incr_pending_limit(infinity, _, _) -> ok;check_and_maybe_incr_pending_limit(Limit, Direction, TransId) -> %% %% We need this kind of test to detect when we _pass_ the limit %% ?rt2("check and maybe incr pending limit", [Direction, Limit, TransId]), case (catch megaco_config:get_pending_counter(Direction, TransId)) of {'EXIT', _} -> %% Has not been created yet (connect). megaco_config:cre_pending_counter(Direction, TransId, 1), ok; Val when Val > Limit -> ?rt2("check and maybe incr - aborted", [Direction, Val, Limit]), aborted; % Already passed the limit Val -> ?rt2("check and maybe incr - incr", [Direction, Val, Limit]), megaco_config:incr_pending_counter(Direction, TransId), if Val < Limit -> ok; % Still within the limit true -> ?rt2("check and maybe incr - error", [Direction, Val, Limit]), error % Passed the limit end end.%% BUGBUG BUGBUG BUGBUG%% %% Do we know that the Rep is still valid? A previous transaction%% could have taken a lot of time.%% handle_request({ConnData, TransId, T}) -> case handle_request(ConnData, TransId, T) of {pending, _RequestData} -> handle_long_request(ConnData, TransId, T); Else -> Else end.handle_request(ConnData, TransId, T) -> ?report_trace(ConnData, "handle request", [TransId, T]), %% Pending limit: %% Ok, before we begin, lets check that this request %% has not been aborted. I.e. exceeded the pending %% limit, so go check it... #conn_data{sent_pending_limit = Limit} = ConnData, case check_and_maybe_create_pending_limit(Limit, sent, TransId) of ok -> %% Ok so far, now update state case megaco_monitor:lookup_reply(TransId) of [Rep] when is_record(Rep, reply) -> Rep2 = Rep#reply{state = eval_request}, megaco_monitor:insert_reply(Rep2), Actions = T#'TransactionRequest'.actions, {AckAction, SendReply} = handle_request_callback(ConnData, TransId, Actions, T), %% Next step, while we where in the callback function, %% the pending limit could have been exceeded, so check %% it again... do_handle_request(AckAction, SendReply, ConnData, TransId); _ -> %% Ugh? ignore end; aborted -> %% Pending limit %% Already exceeded the limit %% The user does not yet know about this request, so %% don't bother telling that it has been aborted... %% Furthermore, the reply timer has not been started, %% so do the cleanup now ?rt1(ConnData, "pending limit already passed", [TransId]), case megaco_monitor:lookup_reply(TransId) of [Rep] -> cancel_reply(ConnData, Rep, aborted); _ -> ok end, ignore end.do_handle_request(_, ignore, _ConnData, _TransId) -> ?rt1(_ConnData, "ignore: don't reply", [_TransId]), ignore;do_handle_request(_, ignore_trans_request, ConnData, TransId) -> ?rt1(ConnData, "ignore trans request: don't reply", [TransId]), case megaco_monitor:lookup_reply(TransId) of [#reply{} = Rep] -> cancel_reply(ConnData, Rep, ignore); _ -> ignore end;do_handle_request({pending, _RequestData}, {aborted, ignore}, _, _) -> ?rt2("handle request: pending - aborted - ignore => don't reply", []), ignore;do_handle_request({pending, _RequestData}, {aborted, _SendReply}, _, _) -> ?rt2("handle request: pending - aborted => don't reply", []), ignore;do_handle_request({pending, RequestData}, _SendReply, _ConnData, _) -> ?rt2("handle request: pending", [RequestData]), {pending, RequestData};do_handle_request(AckAction, {ok, Bin}, ConnData, TransId) -> ?rt1(ConnData, "handle request - ok", [AckAction, TransId]), case megaco_monitor:lookup_reply(TransId) of [#reply{pending_timer_ref = PendingRef} = Rep] ->%% d("do_handle_request -> found reply record:"%% "~n Rep: ~p", [Rep]), #conn_data{reply_timer = InitTimer, conn_handle = ConnHandle} = ConnData, %% Pending limit update: %% - Cancel the pending timer, if running %% - Delete the pending counter %% megaco_monitor:cancel_apply_after(PendingRef), megaco_config:del_pending_counter(sent, TransId), Method = timer_method(AckAction), {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), OptBin = opt_garb_binary(CurrTimer, Bin), M = ?MODULE, F = reply_timeout, A = [ConnHandle, TransId, CurrTimer], Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor), Rep2 = Rep#reply{pending_timer_ref = undefined, handler = undefined, bytes = OptBin, state = waiting_for_ack, timer_ref = Ref, ack_action = AckAction}, megaco_monitor:insert_reply(Rep2), % Timing problem?%% d("do_handle_request -> "%% "~n Rep2: ~p", [Rep2]), ignore; _ -> %% Been removed already? ignore end;do_handle_request(_, {error, aborted}, ConnData, TransId) -> ?report_trace(ConnData, "aborted during our absence", [TransId]), case megaco_monitor:lookup_reply(TransId) of [Rep] -> cancel_reply(ConnData, Rep, aborted); _ -> ok end, ignore;do_handle_request(AckAction, {error, Reason}, ConnData, TransId) -> ?report_trace(ConnData, "error", [TransId, Reason]), case megaco_monitor:lookup_reply(TransId) of [Rep] -> Rep2 = Rep#reply{state = waiting_for_ack, ack_action = AckAction}, cancel_reply(ConnData, Rep2, Reason); _ -> ok end, ignore;do_handle_request(AckAction, SendReply, ConnData, TransId) -> ?report_trace(ConnData, "unknown send trans reply result", [TransId, AckAction, SendReply]), ignore. handle_requests([{ConnData, TransId, T} | Rest], Pending) -> ?rt2("handle requests", [TransId]), case handle_request(ConnData, TransId, T) of {pending, RequestData} -> handle_requests(Rest, [{ConnData,TransId,RequestData} | Pending]); _ -> handle_requests(Rest, Pending) end;handle_requests([], Pending) -> ?rt2("handle requests - done", [Pending]), Pending.%% opt_garb_binary(timeout, _Bin) -> garb_binary; % Need msg at restart of timeropt_garb_binary(_Timer, Bin) -> Bin.timer_method(discard_ack) -> apply_method;timer_method(_) -> spawn_method.handle_long_request({ConnData, TransId, RequestData}) -> ?rt2("handle long request", [TransId, RequestData]), %% Pending limit: %% We need to check the pending limit, in case it was %% exceeded before we got this far... %% We dont need to be able to create the counter here, %% since that was done in the handle_request function. #conn_data{sent_pending_limit = Limit} = ConnData, case check_pending_limit(Limit, sent, TransId) of {ok, _} -> handle_long_request(ConnData, TransId, RequestData); _ -> %% Already exceeded the limit ignore end.handle_long_request(ConnData, TransId, RequestData) -> ?report_trace(ConnData, "callback: trans long request", [TransId, {request_data, RequestData}]), case megaco_monitor:lookup_reply(TransId) of [Rep] when is_record(Rep, reply) -> %% Update (possibly) new handler megaco_monitor:insert_reply(Rep#reply{handler = self()}), {AckAction, Res} = handle_long_request_callback(ConnData, TransId, RequestData), do_handle_long_request(AckAction, Res, ConnData, TransId); _ -> %% Been removed already? ignore end.do_handle_long_request(AckAction, {ok, Bin}, ConnData, TransId) -> case megaco_monitor:lookup_reply(TransId) of [Rep] when is_record(Rep, reply) -> Method = timer_method(AckAction), InitTimer = ConnData#conn_data.reply_timer, {WaitFor, CurrTimer} = megaco_timer:init(InitTimer), OptBin = opt_garb_binary(CurrTimer, Bin), ConnHandle = ConnData#conn_data.conn_handle, M = ?MODULE, F = reply_timeout, A = [ConnHandle, Rep#reply.trans_id, CurrTimer], Ref = megaco_monitor:apply_after(Method, M, F, A, WaitFor), Rep2 = Rep#reply{bytes = OptBin, state = waiting_for_ack, timer_ref = Ref, ack_action = AckAction}, megaco_monitor:insert_reply(Rep2); % Timing problem? _ ->
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -