pullersupplier_impl.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 914 行 · 第 1/3 页
ERL
914 行
%% HasEvent - boolean (out-type)%%-----------------------------------------------------------try_pull(_OE_THIS, _OE_FROM, State) when ?is_ANY(State) -> case ?get_Event(State) of {[], _} -> {reply, {any:create(orber_tc:null(), null), false}, State}; {Event, Bool} -> {reply, {Event, Bool}, State} end;try_pull(_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::SequencePullSupplier ----%%----------------------------------------------------------%%% function : disconnect_sequence_pull_supplier%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_sequence_pull_supplier(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : pull_structured_events%% Arguments: Max - long()%% Returns : [StructuredEvent, ..]%%-----------------------------------------------------------pull_structured_events(_OE_THIS, OE_FROM, State, Max) when ?is_SEQUENCE(State) -> case ?is_BatchLimitReached(State, Max) of true -> %% This test is not fool-proof; if Events have been stored %% using StartTime they will still be there but we cannot %% deliver them anyway. To solve this "problem" would cost! %% Hence, since it works fine otherwise it will do. case ?get_Events(State, Max) of {[], false} -> NewState = start_timer(State), {noreply, ?set_RespondTo(NewState, {OE_FROM, Max})}; {Event,_} -> {reply, Event, State} end; _-> NewState = start_timer(State), {noreply, ?set_RespondTo(NewState, {OE_FROM, Max})} end;pull_structured_events(_,_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : try_pull_structured_events%% Arguments: Max - long()%% Returns : [StructuredEvent, ..]%% HasEvent - Boolean()%%-----------------------------------------------------------try_pull_structured_events(_OE_THIS, _OE_FROM, State, Max) when ?is_SEQUENCE(State) -> {reply, ?get_Events(State, Max), State};try_pull_structured_events(_,_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::StructuredPullSupplier --%%----------------------------------------------------------%%% function : disconnect_structured_pull_supplier%% Arguments: -%% Returns : ok%%-----------------------------------------------------------disconnect_structured_pull_supplier(_OE_THIS, _OE_FROM, State) -> {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : pull_structured_event%% Arguments: -%% Returns : %%-----------------------------------------------------------pull_structured_event(_OE_THIS, OE_FROM, State) when ?is_STRUCTURED(State) -> case ?get_Event(State) of {[], _} -> {noreply, ?set_RespondTo(State, OE_FROM)}; {Event,_} -> {reply, Event, State} end;pull_structured_event(_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : try_pull_structured_event%% Arguments: -%% Returns : %%-----------------------------------------------------------try_pull_structured_event(_OE_THIS, _OE_FROM, State) when ?is_STRUCTURED(State) -> case ?get_Event(State) of {[], _} -> {reply, {?not_CreateSE("","","",[],[],any:create(orber_tc:null(), null)), false}, State}; {Event, Bool} -> {reply, {Event, Bool}, State} end;try_pull_structured_event(_,_,_) -> corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) -> find_ids(List, []).find_ids([], Acc) -> Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(_, _) -> corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list do not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%%-----------------------------------------------------------%% function : callSeq%% Arguments: %% Returns : %%-----------------------------------------------------------callSeq(_OE_THIS, OE_FROM, State, EventsIn, Status) -> %% We should do something here, i.e., see what QoS this Object offers and %% act accordingly. corba:reply(OE_FROM, ok), case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventsIn, ?get_AllFilter(State), ?get_SubscribeDB(State), Status) of {[], _} -> ?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]), {noreply, State}; %% Just one event and we got a client waiting => there is no need to store %% the event, just transform it and pass it on. {[Event], _} when ?is_ANY(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED SEQUENCE[1]==>ANY: ~p~n",[Event]), AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), case ?addAndGet_Event(State, AnyEvent) of {[], _} -> ?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n", [Event]), %% Cannot deliver the event at the moment; perhaps Starttime %% set or Deadline passed. {noreply, State}; {PossiblyOtherEvent, _} -> ?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n", [Event, PossiblyOtherEvent]), corba:reply(?get_RespondTo(State), PossiblyOtherEvent), {noreply, ?reset_RespondTo(State)} end; {[Event],_} when ?is_STRUCTURED(State), ?is_Waiting(State) -> case ?addAndGet_Event(State, Event) of {[], _} -> ?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n", [Event]), %% Cannot deliver the event at the moment; perhaps Starttime %% set or Deadline passed. {noreply, State}; {PossiblyOtherEvent, _} -> ?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n", [Event, PossiblyOtherEvent]), corba:reply(?get_RespondTo(State), PossiblyOtherEvent), {noreply, ?reset_RespondTo(State)} end; %% A sequence of events => store them and extract the first (according to QoS) %% event and forward it. {Events,_} when ?is_ANY(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED SEQUENCE==>ANY: ~p~n",[Events]), store_events(State, Events), case ?get_Event(State) of {[], _} -> {noreply, State}; {AnyEvent, _} -> corba:reply(?get_RespondTo(State), AnyEvent), {noreply, ?reset_RespondTo(State)} end; {Events, _} when ?is_STRUCTURED(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]), store_events(State, Events), case ?get_Event(State) of {[], _} -> {noreply, State}; {_StrEvent, _} -> corba:reply(?get_RespondTo(State), Events), {noreply, ?reset_RespondTo(State)} end; {Events, _} when ?is_SEQUENCE(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]), %% Store them first and extract Max events in QoS order. store_events(State, Events), {RespondTo, Max} = ?get_RespondTo(State), case ?is_BatchLimitReached(State, Max) of true -> {EventSeq, _} = ?get_Events(State, Max), corba:reply(RespondTo, EventSeq), stop_timer(State), {noreply, ?reset_RespondTo(State)}; _-> {noreply, State} end; %% No client waiting. Store the event(s). {Events, _} -> ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]), store_events(State, Events), {noreply, State} end.store_events(_State, []) -> ok;store_events(State, [Event|Rest]) when ?is_ANY(State) -> AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), ?add_Event(State,AnyEvent), store_events(State, Rest);store_events(State, [Event|Rest]) -> ?add_Event(State,Event), store_events(State, Rest).%%-----------------------------------------------------------%% function : callAny%% Arguments: %% Returns : %%-----------------------------------------------------------callAny(_OE_THIS, OE_FROM, State, EventIn, Status) -> corba:reply(OE_FROM, ok), case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn, ?get_AllFilter(State), ?get_SubscribeDB(State), Status) of {[],_} -> ?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]), {noreply, State}; {Event,_} when ?is_ANY(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]), case ?addAndGet_Event(State, Event) of {[],_} -> %% Unable to deliver the event (Starttime, Deadline etc). {noreply, State}; {MaybeOtherEvent , _} -> corba:reply(?get_RespondTo(State), MaybeOtherEvent), {noreply, ?reset_RespondTo(State)} end; {Event,_} when ?is_ANY(State) -> ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]), ?add_Event(State,Event), {noreply, State}; {Event,_} when ?is_STRUCTURED(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]), case ?addAndGet_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)) of {[],_} -> %% Unable to deliver the event (Starttime, Deadline etc). {noreply, State}; {MaybeOtherEvent , _} -> corba:reply(?get_RespondTo(State), MaybeOtherEvent), {noreply, ?reset_RespondTo(State)} end; {Event,_} when ?is_SEQUENCE(State), ?is_Waiting(State) -> ?DBG("PROXY RECEIVED ANY==>SEQUENCE[1]: ~p~n",[Event]), ?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)), {RespondTo, Max} = ?get_RespondTo(State), case ?is_BatchLimitReached(State, Max) of true -> {EventSeq, _} = ?get_Events(State, Max), corba:reply(RespondTo, EventSeq), stop_timer(State), {noreply, ?reset_RespondTo(State)}; _ -> {noreply, State} end; {Event,_} -> ?DBG("PROXY RECEIVED ANY==>STRUCTURED/SEQUENCE: ~p~n",[Event]), ?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)), {noreply, State} end. %% Start timers which send a message each time we should push events. Only used%% when this objects is defined to supply sequences.start_timer(State) -> TS = now(), case catch timer:send_after(timer:seconds(?get_PacingInterval(State)), {pacing, TS}) of {ok,PacTRef} -> ?DBG("PULL SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n", [?get_BatchLimit(State)]), ?set_PacingTimer(State, {PacTRef, TS}); What -> orber:dbg("[~p] PullerSupplier:start_timer();~n" "Unable to invoke timer:send_interval/2: ~p", [?LINE, What], ?DEBUG_LEVEL), corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) end.stop_timer(State) -> case ?get_PacingTimer(State) of undefined -> ok; {Timer, _} -> ?DBG("PULL SUPPLIER STOPPED TIMER~n",[]), timer:cancel(Timer) end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?