⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 megaco_trans_sender.erl

📁 OTP是开放电信平台的简称
💻 ERL
📖 第 1 页 / 共 2 页
字号:
	M ->	    warning_msg("received unexpected message (ignoring): "			"~n~p", [M]),	    loop(S, to(To, Start))    after To ->	    ?d("loop(active,~w) -> timeout - time to send", [To]),	    handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),	    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout)    end;loop(#state{reqs = Reqs, acks = Acks, timeout = Timeout} = S, _To) ->    ?d("loop(active) -> timeout [~w, ~w]", [length(Reqs),length(Acks)]),    handle_send_result( send_msg(S#state.conn_handle, Reqs, Acks) ),    loop(S#state{req_sz = 0, reqs = [], acks = []}, Timeout).%%%-----------------------------------------------------------------%% The request is itself larger then the max size, so first send %% everything we have stored in one message, and then the new request%% in another. %% Note that it does not matter if we with this request %% passed the maxcount limit.%% Note that this message cannot be a re-sent, since %% such a request would have been stored, but sent immediatly.handle_send_req(Tid, Req, 		#state{conn_handle = CH, 		       req_maxsize = MaxSz, reqs = Reqs, acks = Acks} = S)   when size(Req) >= MaxSz ->    ?d("handle_send_req -> request bigger then maxsize ~w", [MaxSz]),    handle_send_result( send_msg(CH, Reqs, Acks) ),    handle_send_result( send_msg(CH, [{Tid, Req}], []) ),    {S#state{req_sz = 0, reqs = [], acks = []}, true};%% And handle all the other caseshandle_send_req(Tid, Req, 		#state{conn_handle  = CH, req_sz = ReqSz, 		       req_maxcount = MaxReqs, req_maxsize = MaxSz, 		       reqs = Reqs, acks = Acks} = S) ->    case lists:keymember(Tid, 1, Reqs) of	true ->	    %% A re-send, time to send whatever we have in the store	    ?d("handle_send_req -> was a re-send, so flush",[]),	    handle_send_result( send_msg(CH, Reqs, Acks) ),	    {S#state{req_sz = 0, reqs = [], acks = []}, true};	false when length(Reqs) + 1 >= MaxReqs ->	    %% We finally passed the req-maxcount limit	    ?d("handle_send_req -> maxcount ~w passed", [MaxReqs]),	    handle_send_result( 	      send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)	     ),	    {S#state{req_sz = 0, reqs = [], acks = []}, true};	false when size(Req) + ReqSz >= MaxSz ->	    %% We finally passed the req-maxsize limit	    ?d("handle_send_req -> maxsize ~w passed", [MaxSz]),	    handle_send_result( 	      send_msg(S#state.conn_handle, [{Tid, Req}|Reqs], Acks)	     ),	    {S#state{req_sz = 0, reqs = [], acks = []}, true};	false ->	    %% Still not time to send	    ?d("handle_send_req -> nothing to be sent",[]),	    {S#state{req_sz = ReqSz + size(Req), reqs = [{Tid, Req}|Reqs]}, 	     false}    end.	    %% We passed the req-maxcount limit: Time to send, atleast some of %% the stuff...handle_send_reqs(Tids, Reqs0, 		 #state{conn_handle = CH,			req_maxsize = MaxSz, req_sz = ReqSz, 			req_maxcount = MaxReqs, reqs = Reqs, acks = Acks} = S)   when length(Reqs0) + length(Reqs) >= MaxReqs ->    ?d("handle_send_reqs -> maxcount ~w: ~w, ~w", 	[MaxSz,length(Reqs0),length(Reqs)]),    Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),    {NewReqs, NewReqSz} = send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz),    ?d("handle_send_reqs -> sent:"	"~n   NewReqSz:        ~w"	"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),    {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};%% We did not pass the req-maxcount limit, but we could have passed the %% req-maxsize limit, so maybe send...handle_send_reqs(Tids, Reqs0, #state{conn_handle = CH,				     req_maxsize = MaxSz, req_sz = ReqSz, 				     reqs = Reqs, acks = Acks} = S) ->    ?d("handle_send_reqs -> not maxcount - maybe maxsize (~w)", [MaxSz]),    Reqs1 = merge_tids_and_reqs(Tids, Reqs0, []),        case maybe_send_reqs(CH, Reqs1, Acks, Reqs, ReqSz, MaxSz, false) of	{NewReqs, NewReqSz, true} ->	    ?d("handle_send_reqs -> sent:"		"~n   NewReqSz:        ~w"		"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),	    {S#state{req_sz = NewReqSz, reqs = NewReqs, acks = []}, true};	{NewReqs, NewReqSz, false} ->	    ?d("handle_send_reqs -> not sent:"		"~n   NewReqSz:        ~w"		"~n   length(NewReqs): ~w", [NewReqSz, length(NewReqs)]),	    {S#state{req_sz = NewReqSz, reqs = NewReqs}, false}    end.    merge_tids_and_reqs([], [], Reqs) ->    Reqs;merge_tids_and_reqs([Tid|Tids], [Req|Reqs], Acc) ->    merge_tids_and_reqs(Tids, Reqs, [{Tid,Req}|Acc]).%% We know that we shall send, so if maybe_send_reqs does not,%% we send it our self...send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz) ->    ?d("send_reqs -> entry when"	"~n   length(Reqs): ~w"	"~n   Acks:         ~w"	"~n   length(Acc):  ~w"	"~n   AccSz:        ~w", [length(Reqs), Acks, length(Acc), AccSz]),    case maybe_send_reqs(CH, Reqs, Acks, Acc, AccSz, MaxSz, false) of	{NewReqs, _NewReqSz, false} ->	    ?d("send_reqs -> nothing sent yet"		"~n   length(NewReqs): ~w", [length(NewReqs)]),	    handle_send_result( send_msg(CH, NewReqs, Acks) ),	    {[], 0};	{NewReqs, NewReqSz, true} ->	    ?d("send_reqs -> something sent"		"~n   length(NewReqs): ~w"		"~n   NewReqSz:        ~w", [length(NewReqs), NewReqSz]),	    {NewReqs, NewReqSz}    end.maybe_send_reqs(_CH, [], _Acks, Acc, AccSz, _MaxSz, Sent) ->    ?d("maybe_send_reqs -> done when"	"~n   Sent:        ~w"	"~n   AccSz:       ~w"	"~n   length(Acc): ~w", [Sent, AccSz, length(Acc)]),    {Acc, AccSz, Sent};maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, _AccSz, MaxSz, _Sent)   when size(Req) >= MaxSz ->    %% The request was above the maxsize limit, so first send     %% what's in store and the the big request.    ?d("maybe_send_reqs -> entry when request [~w] size (~w) > max size"	"~n   Acks:        ~w"	"~n   length(Acc): ~w", [Tid, size(Req), Acks, length(Acc)]),    handle_send_result( send_msg(CH, Acc, Acks) ),    handle_send_result( send_msg(CH, [{Tid, Req}], []) ),    maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, _Sent)   when AccSz + size(Req) >= MaxSz ->    %% We _did_ pass the maxsize limit with this request, so send     ?d("maybe_send_reqs -> entry when sum of requests (~w) > max size"	"~n   Tid:         ~w"	"~n   Acks:        ~w"	"~n   length(Acc): ~w", [Tid, size(Req) + AccSz, Acks, length(Acc)]),    handle_send_result( send_msg(CH, [{Tid, Req}|Acc], Acks) ),    maybe_send_reqs(CH, Reqs, [], [], 0, MaxSz, true);maybe_send_reqs(CH, [{Tid, Req}|Reqs], Acks, Acc, AccSz, MaxSz, Sent) ->    ?d("maybe_send_reqs -> entry when"	"~n   Tid:         ~w"	"~n   size(Req):   ~w"	"~n   Acks:        ~w"	"~n   length(Acc): ~w"	"~n   AccSz:       ~w", [Tid, size(Req), Acks, length(Acc), AccSz]),    NewAcc   = [{Tid,Req}|Acc],     NewAccSz = AccSz + size(Req),     maybe_send_reqs(CH, Reqs, Acks, NewAcc, NewAccSz, MaxSz, Sent).    %%%-----------------------------------------------------------------    send_pending(CH, Serial, Reqs, Acks) ->    ?d("send_pending -> entry with"	"~n   Serial:       ~w"	"~n   length(Reqs): ~w"	"~n   length(Acks): ~w", [Serial, length(Reqs), length(Acks)]),    case megaco_config:lookup_local_conn(CH) of	[CD] ->	    TP = #'TransactionPending'{transactionId = Serial},	    Pend = {transactionPending, TP},	    do_send_msg(CD, Pend, lists:reverse(Reqs), Acks);	[] ->	    ok    end.%% We need to check the size of the reply. If the reply itself is %% larger then the max limit, then it is sent in a separate message.send_reply(CH, Reply, MaxSz, _ReqSz, Reqs, Acks) ->    ?d("send_reply -> entry with"	"~n   length(Reqs): ~w"	"~n   length(Acks): ~w", [length(Reqs), length(Acks)]),    case megaco_config:lookup_local_conn(CH) of	[CD] when size(Reply) > MaxSz ->	    handle_send_result( send_msg(CD, lists:reverse(Reqs), Acks) ),	    Rep = {transactionReply, Reply},	    do_send_msg(CD, Rep, [], []);	[CD] ->	    Rep = {transactionReply, Reply},	    do_send_msg(CD, Rep, lists:reverse(Reqs), Acks);	[] ->	    ok    end.do_send_msg(CD, Trans, [], []) ->    Body   = {transactions, [Trans]},    Slogan = "send trans reply/pending",     ?d("do_send_msg -> ~s", [Slogan]),    megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, Reqs0, []) ->    Reqs   = [{transactionRequest, Req} || {_, Req} <- Reqs0],    Body   = {transactions, [Trans|Reqs]},    Slogan = "send trans reply/pending and reqs",     ?d("do_send_msg -> ~s", [Slogan]),    megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, [], SerialRanges) ->    Acks   = make_acks(ranges(SerialRanges), []),    Body   = {transactions, [Trans, {transactionResponseAck, Acks}]},    Slogan = "send trans reply/pending and acks",     ?d("do_send_msg -> ~s", [Slogan]),    megaco_messenger_misc:send_body(CD, Slogan, Body);do_send_msg(CD, Trans, Reqs0, SerialRanges) ->    Acks   = make_acks(ranges(SerialRanges), []),    Reqs   = [{transactionRequest, Req} || {_, Req} <- Reqs0],    Body   = {transactions, [Trans, {transactionResponseAck, Acks}|Reqs]},    Slogan = "send trans reply/pending, reqs and acks",     ?d("do_send_msg -> ~s", [Slogan]),    megaco_messenger_misc:send_body(CD, Slogan, Body).send_msg(_, [], []) ->    ok;send_msg(CH, Reqs, Serials) ->    case megaco_config:lookup_local_conn(CH) of	[ConnData] ->	    do_send_msg(ConnData, lists:reverse(Reqs), Serials);	[] ->	    ok    end.do_send_msg(CD, Reqs0, []) ->    ?d("do_send_msg -> entry with"	"~n   length(Reqs0): ~p", [length(Reqs0)]),        Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),    Body = {transactions, Reqs},    megaco_messenger_misc:send_body(CD, "send trans reqs", Body);do_send_msg(CD, [], SerialRanges) ->    ?d("do_send_msg -> entry with" 	"~n   SerialRanges: ~p", [SerialRanges]),        Acks = make_acks(ranges(SerialRanges), []),    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),    Body = {transactions, [{transactionResponseAck, Acks}]},    megaco_messenger_misc:send_body(CD, "send trans acks", Body);do_send_msg(CD, Reqs0, SerialRanges) ->    ?d("do_send_msg -> entry with"	"~n   length(Reqs0): ~p" 	"~n   SerialRanges:  ~p", [length(Reqs0), SerialRanges]),        Acks = make_acks(ranges(SerialRanges), []),    Reqs = [{transactionRequest, Req} || {_, Req} <- Reqs0],    %% ?d("do_send_msg -> Reqs: ~n~p", [Reqs]),    Body = {transactions, [{transactionResponseAck, Acks}|Reqs]},    megaco_messenger_misc:send_body(CD, "send trans reqs and acks", Body).handle_send_result(ok) ->    ok;handle_send_result({ok, _}) ->    ok;handle_send_result({error, {send_message_cancelled, _Reason}}) ->    ok;handle_send_result({error, {send_message_failed, Reason}}) ->    error_msg("Failed sending message: ~n   ~p", [Reason]),    error;handle_send_result(Error) ->    error_msg("Failed sending message: ~n   ~p", [Error]),    error.ranges(L) ->    lists:reverse(ranges(lists:sort(L), [], [])).ranges([], Range, Ranges) ->    ranges2(Range, Ranges);ranges([S1|Sn], [S2|_] = Range, Ranges) when S1 == (S2+1) ->    ranges(Sn, [S1|Range], Ranges);ranges([S|Sn], Range, Ranges) ->    ranges(Sn, [S], ranges2(Range, Ranges)).ranges2([], Ranges) ->     Ranges;ranges2([S], Ranges) ->    [{S,S}|Ranges];ranges2(Range0, Ranges) ->    Range = lists:reverse(Range0),    [{hd(Range),lists:last(Range)}|Ranges].make_acks([], Acks) ->    lists:reverse(Acks);make_acks([{S,S}|SerialRanges], Acks) ->    TRA = #'TransactionAck'{firstAck = S},    make_acks(SerialRanges, [TRA|Acks]);make_acks([{F,L}|SerialRanges], Acks) ->    TRA = #'TransactionAck'{firstAck = F, lastAck = L},    make_acks(SerialRanges, [TRA|Acks]).%%%-----------------------------------------------------------------to(To, Start) ->    To - (t() - Start).%% Time in milli secondst() ->    {A,B,C} = erlang:now(),    A*1000000000+B*1000+(C div 1000).warning_msg(F, A) ->    ?megaco_warning("Transaction sender: " ++ F, A). error_msg(F, A) ->    ?megaco_error("Transaction sender: " ++ F, A). %%%-----------------------------------------------------------------%%% System messages handled here%%%-----------------------------------------------------------------system_continue(_Parent, _Dbg, {S,To}) ->    loop(S, To).system_terminate(Reason, _Parent, _Dbg, {S, _}) ->    #state{conn_handle = CH, reqs = Reqs, acks = Acks} = S,    send_msg(CH, Reqs, Acks),    exit(Reason).system_code_change(S, _Module, _OLdVsn, _Extra) ->    ?d("system_code_change -> entry", []),    {ok, S}.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -