📄 megaco_trans_sender.erl
字号:
M -> warning_msg("received unexpected message (ignoring): " "~n~p", [M]), loop(S, to(To, Start)) after To -> ?d("loop(active,~w) -> timeout - time to send", [To]), handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout) end;loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) -> ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]), handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ), loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout).%%%-----------------------------------------------------------------%% The request is itself larger then the max size, so first send %% everything we have stored in one message, and then the new request%% in another. %% Note that it does not matter if we with this request %% passed the maxcount limit.%% Note that this message cannot be a re-sent, since %% such a request would have been stored, but sent immediatly.handle_send_req(Tid, Req, #state{conn_handle = CH, req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S) when size(Req) >= MaxSz -> ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]), handle_send_result( send_msg(CH, Reqs, Acks) ), handle_send_result( send_msg(CH, [{Tid, Req}], []) ), {S#state{req_sz = 0, reqs = [], acks = []}, true};%% And handle all the other caseshandle_send_req(Tid, Req, #state{conn_handle = CH, req_sz = ReqSz, req_maxcount = MaxReqs, req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S) -> case lists:keymember(Tid, 1, Reqs) of true -> %% A re-send, time to send whatever we have in the store ?d("handle_send_req -> was a re-send, so flush",[]), handle_send_result( send_msg(CH, Reqs, Acks) ), {S#state{req_sz = 0, reqs = [], acks = []}, true}; false when length(Reqs) + 1 >= MaxReqs -> %% We finally passed the req-maxcount limit ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]), handle_send_result( send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) ), {S#state{req_sz = 0, reqs = [], acks = []}, true}; false when size(Req) + ReqSz >= MaxSz -> %% We finally passed the req-maxsize limit ?d("handle_send_req -> maxsize ~w passed", [MaxSz]), handle_send_result( send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks) ), {S#state{req_sz = 0, reqs = [], acks = []}, true}; false -> %% Still not time to send ?d("handle_send_req -> nothing to be sent",[]), {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]}, false} end. %% We passed the req-maxcount limit: Time to send, atleast some of %% the stuff...handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz, req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S) when length(Reqs0) + length(Reqs) >= MaxReqs -> ?d("handle_send_reqs -> maxcount ~w: ~w, ~w", [MaxSz,length(Reqs0),length(Reqs)]), Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz), ?d("handle_send_reqs -> sent:" "~n NewReqSz: ~w" "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};%% We did not pass the req-maxcount limit, but we could have passed the %% req-maxsize limit, so maybe send...handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH, req_maxsize = MaxSz, req_sz = ReqSz, reqs = Reqs, acks = Acks} = S) -> ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]), Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []), case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of {NewReqs, NewReqSz, true} -> ?d("handle_send_reqs -> sent:" "~n NewReqSz: ~w" "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true}; {NewReqs, NewReqSz, false} -> ?d("handle_send_reqs -> not sent:" "~n NewReqSz: ~w" "~n length(NewReqs): ~w", [NewReqSz, length(NewReqs)]), {S#state{req_sz = NewReqSz, reqs = NewReqs}, false} end. merge_tids_and_reqs([], [], Reqs) -> Reqs;merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) -> merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]).%% We know that we shall send, so if maybe_send_reqs does not,%% we send it our self...send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) -> ?d("send_reqs -> entry when" "~n length(Reqs): ~w" "~n Acks: ~w" "~n length(Acc): ~w" "~n AccSz: ~w", [length(Reqs), Acks, length(Acc), AccSz]), case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of {NewReqs, _NewReqSz, false} -> ?d("send_reqs -> nothing sent yet" "~n length(NewReqs): ~w", [length(NewReqs)]), handle_send_result( send_msg(CH, NewReqs, Acks) ), {[], 0}; {NewReqs, NewReqSz, true} -> ?d("send_reqs -> something sent" "~n length(NewReqs): ~w" "~n NewReqSz: ~w", [length(NewReqs), NewReqSz]), {NewReqs, NewReqSz} end.maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) -> ?d("maybe_send_reqs -> done when" "~n Sent: ~w" "~n AccSz: ~w" "~n length(Acc): ~w", [Sent, AccSz, length(Acc)]), {Acc, AccSz, Sent};maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent) when size(Req) >= MaxSz -> %% The request was above the maxsize limit, so first send %% what's in store and the the big request. ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size" "~n Acks: ~w" "~n length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]), handle_send_result( send_msg(CH, Acc, Acks) ), handle_send_result( send_msg(CH, [{Tid, Req}], []) ), maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent) when AccSz + size(Req) >= MaxSz -> %% We _did_ pass the maxsize limit with this request, so send ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size" "~n Tid: ~w" "~n Acks: ~w" "~n length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]), handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ), maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) -> ?d("maybe_send_reqs -> entry when" "~n Tid: ~w" "~n size(Req): ~w" "~n Acks: ~w" "~n length(Acc): ~w" "~n AccSz: ~w", [Tid, size(Req), Acks, length(Acc), AccSz]), NewAcc = [{Tid,Req}|Acc], NewAccSz = AccSz + size(Req), maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent). %%%----------------------------------------------------------------- send_pending(CH, Serial, Reqs, Acks) -> ?d("send_pending -> entry with" "~n Serial: ~w" "~n length(Reqs): ~w" "~n length(Acks): ~w", [Serial, length(Reqs), length(Acks)]), case megaco_config:lookup_local_conn(CH) of [CD] -> TP = #'TransactionPending'{transactionId = Serial}, Pend = {transactionPending, TP}, do_send_msg(CD, Pend, lists:reverse(Reqs), Acks); [] -> ok end.%% We need to check the size of the reply. If the reply itself is %% larger then the max limit, then it is sent in a separate message.send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) -> ?d("send_reply -> entry with" "~n length(Reqs): ~w" "~n length(Acks): ~w", [length(Reqs), length(Acks)]), case megaco_config:lookup_local_conn(CH) of [CD] when size(Reply) > MaxSz -> handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ), Rep = {transactionReply, Reply}, do_send_msg(CD, Rep, [], []); [CD] -> Rep = {transactionReply, Reply}, do_send_msg(CD, Rep, lists:reverse(Reqs), Acks); [] -> ok end.do_send_msg(CD, Trans, [], []) -> Body = {transactions, [Trans]}, Slogan = "send trans reply/pending", ?d("do_send_msg -> ~s", [Slogan]), megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, Reqs0, []) -> Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], Body = {transactions, [Trans|Reqs]}, Slogan = "send trans reply/pending and reqs", ?d("do_send_msg -> ~s", [Slogan]), megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, [], SerialRanges) -> Acks = make_acks(ranges(SerialRanges), []), Body = {transactions, [Trans, {transactionResponseAck, Acks}]}, Slogan = "send trans reply/pending and acks", ?d("do_send_msg -> ~s", [Slogan]), megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, Reqs0, SerialRanges) -> Acks = make_acks(ranges(SerialRanges), []), Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], Body = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]}, Slogan = "send trans reply/pending, reqs and acks", ?d("do_send_msg -> ~s", [Slogan]), megaco_messenger_misc:send_body(CD, Slogan, Body).send_msg(_, [], []) -> ok;send_msg(CH, Reqs, Serials) -> case megaco_config:lookup_local_conn(CH) of [ConnData] -> do_send_msg(ConnData, lists:reverse(Reqs), Serials); [] -> ok end.do_send_msg(CD, Reqs0, []) -> ?d("do_send_msg -> entry with" "~n length(Reqs0): ~p", [length(Reqs0)]), Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), Body = {transactions, Reqs}, megaco_messenger_misc:send_body(CD, "send trans reqs", Body);do_send_msg(CD, [], SerialRanges) -> ?d("do_send_msg -> entry with" "~n SerialRanges: ~p", [SerialRanges]), Acks = make_acks(ranges(SerialRanges), []), %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), Body = {transactions, [{transactionResponseAck, Acks}]}, megaco_messenger_misc:send_body(CD, "send trans acks", Body);do_send_msg(CD, Reqs0, SerialRanges) -> ?d("do_send_msg -> entry with" "~n length(Reqs0): ~p" "~n SerialRanges: ~p", [length(Reqs0), SerialRanges]), Acks = make_acks(ranges(SerialRanges), []), Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0], %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]), Body = {transactions, [{transactionResponseAck, Acks}|Reqs]}, megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body).handle_send_result(ok) -> ok;handle_send_result({ok, _}) -> ok;handle_send_result({error, {send_message_cancelled, _Reason}}) -> ok;handle_send_result({error, {send_message_failed, Reason}}) -> error_msg("Failed sending message: ~n ~p", [Reason]), error;handle_send_result(Error) -> error_msg("Failed sending message: ~n ~p", [Error]), error.ranges(L) -> lists:reverse(ranges(lists:sort(L), [], [])).ranges([], Range, Ranges) -> ranges2(Range, Ranges);ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) -> ranges(Sn, [S1|Range], Ranges);ranges([S|Sn], Range, Ranges) -> ranges(Sn, [S], ranges2(Range, Ranges)).ranges2([], Ranges) -> Ranges;ranges2([S], Ranges) -> [{S,S}|Ranges];ranges2(Range0, Ranges) -> Range = lists:reverse(Range0), [{hd(Range),lists:last(Range)}|Ranges].make_acks([], Acks) -> lists:reverse(Acks);make_acks([{S,S}|SerialRanges], Acks) -> TRA = #'TransactionAck'{firstAck = S}, make_acks(SerialRanges, [TRA|Acks]);make_acks([{F,L}|SerialRanges], Acks) -> TRA = #'TransactionAck'{firstAck = F, lastAck = L}, make_acks(SerialRanges, [TRA|Acks]).%%%-----------------------------------------------------------------to(To, Start) -> To - (t() - Start).%% Time in milli secondst() -> {A,B,C} = erlang:now(), A*1000000000+B*1000+(C div 1000).warning_msg(F, A) -> ?megaco_warning("Transaction sender: " ++ F, A). error_msg(F, A) -> ?megaco_error("Transaction sender: " ++ F, A). %%%-----------------------------------------------------------------%%% System messages handled here%%%-----------------------------------------------------------------system_continue(_Parent, _Dbg, {S,To}) -> loop(S, To).system_terminate(Reason, _Parent, _Dbg, {S, _}) -> #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S, send_msg(CH, Reqs, Acks), exit(Reason).system_code_change(S, _Module, _OLdVsn, _Extra) -> ?d("system_code_change -> entry", []), {ok, S}.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -