pullersupplier_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 914 行 · 第 1/3 页

ERL
914
字号
%%            HasEvent - boolean (out-type)%%-----------------------------------------------------------try_pull(_OE_THIS, _OE_FROM, State) when ?is_ANY(State) ->    case ?get_Event(State) of	{[], _} ->	    {reply, {any:create(orber_tc:null(), null), false}, State};	{Event, Bool} ->	    {reply, {Event, Bool}, State}    end;try_pull(_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::SequencePullSupplier ----%%----------------------------------------------------------%%% function : disconnect_sequence_pull_supplier%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_sequence_pull_supplier(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : pull_structured_events%% Arguments: Max - long()%% Returns  : [StructuredEvent, ..]%%-----------------------------------------------------------pull_structured_events(_OE_THIS, OE_FROM, State, Max) when ?is_SEQUENCE(State) ->    case ?is_BatchLimitReached(State, Max) of	true ->	    %% This test is not fool-proof; if Events have been stored	    %% using StartTime they will still be there but we cannot	    %% deliver them anyway. To solve this "problem" would cost!	    %% Hence, since it works fine otherwise it will do.	    case ?get_Events(State, Max) of		{[], false} ->		    NewState = start_timer(State),		    {noreply, ?set_RespondTo(NewState, {OE_FROM, Max})};		{Event,_} ->		    {reply, Event, State}	    end;	_->	    NewState = start_timer(State),	    {noreply, ?set_RespondTo(NewState, {OE_FROM, Max})}    end;pull_structured_events(_,_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : try_pull_structured_events%% Arguments: Max - long()%% Returns  : [StructuredEvent, ..]%%            HasEvent - Boolean()%%-----------------------------------------------------------try_pull_structured_events(_OE_THIS, _OE_FROM, State, Max) when ?is_SEQUENCE(State) ->    {reply, ?get_Events(State, Max), State};try_pull_structured_events(_,_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyComm::StructuredPullSupplier --%%----------------------------------------------------------%%% function : disconnect_structured_pull_supplier%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_structured_pull_supplier(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----------------------------------------------------------%%% function : pull_structured_event%% Arguments: -%% Returns  : %%-----------------------------------------------------------pull_structured_event(_OE_THIS, OE_FROM, State) when ?is_STRUCTURED(State) ->    case ?get_Event(State) of	{[], _} ->	    {noreply, ?set_RespondTo(State, OE_FROM)};	{Event,_} ->	    {reply, Event, State}    end;pull_structured_event(_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : try_pull_structured_event%% Arguments: -%% Returns  : %%-----------------------------------------------------------try_pull_structured_event(_OE_THIS, _OE_FROM, State) when ?is_STRUCTURED(State) ->    case ?get_Event(State) of	{[], _} ->	    {reply, 	     {?not_CreateSE("","","",[],[],any:create(orber_tc:null(), null)), false}, 	     State};	{Event, Bool} ->	    {reply, {Event, Bool}, State}    end;try_pull_structured_event(_,_,_) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) ->           find_ids(List, []).find_ids([], Acc) ->        Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(_, _) -> corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list do not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%%-----------------------------------------------------------%% function : callSeq%% Arguments: %% Returns  : %%-----------------------------------------------------------callSeq(_OE_THIS, OE_FROM, State, EventsIn, Status) ->    %% We should do something here, i.e., see what QoS this Object offers and    %% act accordingly.    corba:reply(OE_FROM, ok),    case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventsIn,						?get_AllFilter(State),						?get_SubscribeDB(State),						Status) of	{[], _} ->	    ?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]),	    {noreply, State};	%% Just one event and we got a client waiting => there is no need to store	%% the event, just transform it and pass it on.	{[Event], _} when ?is_ANY(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE[1]==>ANY: ~p~n",[Event]),	    AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),	    case ?addAndGet_Event(State, AnyEvent) of		{[], _} ->		    ?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n",				 [Event]),		    %% Cannot deliver the event at the moment; perhaps Starttime		    %% set or Deadline passed.		    {noreply, State};		{PossiblyOtherEvent, _} ->		    ?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n",				 [Event, PossiblyOtherEvent]),		    corba:reply(?get_RespondTo(State), PossiblyOtherEvent),		    {noreply, ?reset_RespondTo(State)}	    end;	{[Event],_} when ?is_STRUCTURED(State), ?is_Waiting(State) ->	    case ?addAndGet_Event(State, Event) of		{[], _} ->		    ?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n",				 [Event]),		    %% Cannot deliver the event at the moment; perhaps Starttime		    %% set or Deadline passed.		    {noreply, State};		{PossiblyOtherEvent, _} ->		    ?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n",				 [Event, PossiblyOtherEvent]),		    corba:reply(?get_RespondTo(State), PossiblyOtherEvent),		    {noreply, ?reset_RespondTo(State)}	    end;	%% A sequence of events => store them and extract the first (according to QoS)	%% event and forward it.	{Events,_} when ?is_ANY(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE==>ANY: ~p~n",[Events]),	    store_events(State, Events),	    case ?get_Event(State) of		{[], _} ->		    {noreply, State};		{AnyEvent, _} ->		    corba:reply(?get_RespondTo(State), AnyEvent),		    {noreply, ?reset_RespondTo(State)}	    end;	{Events, _} when ?is_STRUCTURED(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),	    store_events(State, Events),	    case ?get_Event(State) of		{[], _} ->		    {noreply, State};		{_StrEvent, _} ->		    corba:reply(?get_RespondTo(State), Events),		    {noreply, ?reset_RespondTo(State)}	    end;	{Events, _} when ?is_SEQUENCE(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),	    %% Store them first and extract Max events in QoS order.	    store_events(State, Events),	    {RespondTo, Max} = ?get_RespondTo(State),	    case ?is_BatchLimitReached(State, Max) of		true ->		    {EventSeq, _} = ?get_Events(State, Max),		    corba:reply(RespondTo, EventSeq),		    stop_timer(State),		    {noreply, ?reset_RespondTo(State)};		_->		   {noreply, State}	    end;	%% No client waiting. Store the event(s).	{Events, _} ->	    ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),	    store_events(State, Events),	    {noreply, State}    end.store_events(_State, []) ->    ok;store_events(State, [Event|Rest]) when ?is_ANY(State) ->    AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),    ?add_Event(State,AnyEvent),    store_events(State, Rest);store_events(State, [Event|Rest]) ->    ?add_Event(State,Event),    store_events(State, Rest).%%-----------------------------------------------------------%% function : callAny%% Arguments: %% Returns  : %%-----------------------------------------------------------callAny(_OE_THIS, OE_FROM, State, EventIn, Status) ->    corba:reply(OE_FROM, ok),    case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn,						?get_AllFilter(State),						?get_SubscribeDB(State),						Status) of	{[],_} ->	    ?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]),	    {noreply, State};	{Event,_} when ?is_ANY(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]),	    case ?addAndGet_Event(State, Event) of		{[],_} ->		    %% Unable to deliver the event (Starttime, Deadline etc).		    {noreply, State};		{MaybeOtherEvent , _} ->		    corba:reply(?get_RespondTo(State), MaybeOtherEvent),		    {noreply, ?reset_RespondTo(State)}	    end;	{Event,_} when ?is_ANY(State) ->	    ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]),	    ?add_Event(State,Event),	    {noreply, State};	{Event,_} when ?is_STRUCTURED(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]),	    case ?addAndGet_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)) of		{[],_} ->		    %% Unable to deliver the event (Starttime, Deadline etc).		    {noreply, State};		{MaybeOtherEvent , _} ->		    corba:reply(?get_RespondTo(State), MaybeOtherEvent),		    {noreply, ?reset_RespondTo(State)}	    end;	{Event,_} when ?is_SEQUENCE(State), ?is_Waiting(State) ->	    ?DBG("PROXY RECEIVED ANY==>SEQUENCE[1]: ~p~n",[Event]),	    ?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)),	    {RespondTo, Max} = ?get_RespondTo(State),	    case ?is_BatchLimitReached(State, Max) of		true ->		    {EventSeq, _} = ?get_Events(State, Max),		    corba:reply(RespondTo, EventSeq),		    stop_timer(State),		    {noreply, ?reset_RespondTo(State)};		_ ->		    {noreply, State}	    end;	{Event,_} ->	    ?DBG("PROXY RECEIVED ANY==>STRUCTURED/SEQUENCE: ~p~n",[Event]),	    ?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)),	    {noreply, State}    end.	    %% Start timers which send a message each time we should push events. Only used%% when this objects is defined to supply sequences.start_timer(State) ->    TS = now(),    case catch timer:send_after(timer:seconds(?get_PacingInterval(State)), 				{pacing, TS}) of	{ok,PacTRef} ->	    ?DBG("PULL SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n",			 [?get_BatchLimit(State)]),	    ?set_PacingTimer(State, {PacTRef, TS});	What ->	    orber:dbg("[~p] PullerSupplier:start_timer();~n"		      "Unable to invoke timer:send_interval/2: ~p", 		      [?LINE, What], ?DEBUG_LEVEL),	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})    end.stop_timer(State) ->    case ?get_PacingTimer(State) of	undefined ->	    ok;	{Timer, _} ->	    ?DBG("PULL SUPPLIER STOPPED TIMER~n",[]),	    timer:cancel(Timer)    end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?