pullerconsumer_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 773 行 · 第 1/3 页

ERL
773
字号
    ?add_Publish(State, H),    update_publish(add, State, T);update_publish(remove, State, [H|T]) ->    ?del_Publish(State, H),    update_publish(remove, State, T).%%----- Inherit from CosNotifyFilter::FilterAdmin -----------%%----------------------------------------------------------%%% function : add_filter%% Arguments: Filter - CosNotifyFilter::Filter%% Returns  : FilterID - long%%-----------------------------------------------------------add_filter(_OE_THIS, _OE_FROM, State, Filter) ->    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),    FilterID = ?new_Id(State),    NewState = ?set_IdCounter(State, FilterID),    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.%%----------------------------------------------------------%%% function : remove_filter%% Arguments: FilterID - long%% Returns  : ok%%-----------------------------------------------------------remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ok, ?del_Filter(State, FilterID)};remove_filter(_,_,_,What) ->    orber:dbg("[~p] PullerConsumer:remove_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_filter%% Arguments: FilterID - long%% Returns  : Filter - CosNotifyFilter::Filter |%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}%%-----------------------------------------------------------get_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ?get_Filter(State, FilterID), State};get_filter(_,_,_,What) ->    orber:dbg("[~p] PullerConsumer:get_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_all_filters%% Arguments: -%% Returns  : Filter - CosNotifyFilter::FilterIDSeq%%-----------------------------------------------------------get_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_AllFilterID(State), State}.%%----------------------------------------------------------%%% function : remove_all_filters%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------remove_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ok, ?del_AllFilter(State)}.%%----- Inherit from CosEventComm::PullConsumer -------------%%----------------------------------------------------------%%% function : disconnect_pull_consumer%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_pull_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::SequencePullConsumer ----%%----------------------------------------------------------%%% function : disconnect_sequence_pull_consumer%% Arguments: -%% Returns  : ok %%-----------------------------------------------------------disconnect_sequence_pull_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::StructuredPullConsumer ----%%----------------------------------------------------------%%% function : disconnect_structured_pull_consumer%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_structured_pull_consumer(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) ->           find_ids(List, []).find_ids([], Acc) ->        Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(What, _) ->     orber:dbg("[~p] PullerConsumer:find_ids();~n"	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list don not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%% Start timer which send a message each time we should pull for new events.start_timer(State) ->    case catch timer:send_interval(?get_PullInterval(State), pull) of	{ok,PullTRef} ->	    ?DBG("PULL CONSUMER STARTED PULL TIMER ~p~n",			 [?get_PullInterval(State)]),	    ?set_PullTimer(State, PullTRef);	What ->	    orber:dbg("[~p] PullerConsumer:start_timer();~n"		      "Unable to invoke timer:send_interval/2: ~p", 		      [?LINE, What], ?DEBUG_LEVEL),	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})    end.stop_timer(State) ->    ?DBG("PULL CONSUMER STOPPED TIMER~n",[]),    timer:cancel(?get_PullTimer(State)).%% Try pull event(s); which method is determined by which type this proxy is.try_pull_events(State) when ?is_ANY(State) ->    case catch 'CosEventComm_PullSupplier':try_pull(?get_Client(State)) of	{_,false} ->	    {noreply, State};	{Event, true} ->	    case ?not_isConvertedStructured(Event) of		true ->		    forward(seq, State,   			    cosNotification_eventDB:filter_events([any:get_value(Event)], 								  ?get_AllFilter(State)));		_ ->		    forward(any, State,  			    cosNotification_eventDB:filter_events([Event], 								  ?get_AllFilter(State)))	    end;	_->	    {noreply, State}    end;try_pull_events(State) when ?is_SEQUENCE(State) ->    case catch 'CosNotifyComm_SequencePullSupplier':	try_pull_structured_events(?get_Client(State), ?get_BatchLimit(State)) of	{_,false} ->	    {noreply, State};	{EventSeq, true} ->	    %% We cannot convert parts of the sequence to any, event though they	    %% are converted from any to structured. Would be 'impossible' to send.	    forward(seq, State, 		    cosNotification_eventDB:filter_events(EventSeq, 							  ?get_AllFilter(State)));	_->	    {noreply, State}    end;try_pull_events(State) when ?is_STRUCTURED(State) ->    case catch 'CosNotifyComm_StructuredPullSupplier':	try_pull_structured_event(?get_Client(State)) of	{_,false} ->	    {noreply, State};	{Event, true} when ?not_isConvertedAny(Event) ->	    forward(any, State, 		    cosNotification_eventDB:filter_events([Event#'CosNotification_StructuredEvent'.remainder_of_body], 							  ?get_AllFilter(State))); 	{Event, true} ->	    forward(seq, State, 		    cosNotification_eventDB:filter_events([Event], 							  ?get_AllFilter(State)));	_->	    {noreply, State}    end.  					%% Forward eventsforward(_, State, {[], _}) when ?is_ANDOP(State) ->    %% Did not pass filtering. Since AND no need to pass on.    {noreply, State};forward(Type, State, {[], Failed}) ->    %% Did not pass filtering, but since OR it may pass Admin filters, hence, pass     %% on to Admin    forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH');forward(Type, State, {Passed, _}) when ?is_ANDOP(State) ->    %% Did pass filtering, but since AND we must pass it to Admin to check against    %% its Filters. Just ignore the ones that failed.    forward(Type, State, Passed, ?get_MyAdmin(State), 'MATCH');forward(Type, State, {Passed, []}) ->    %% Did pass filtering, and since OR we can pass it to the Channel directly.    forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED');forward(Type, State, {Passed, Failed}) ->    %% Some passed filtering, and since OR we can pass the ones that passed directly    %% to the channel and the other ones via the admin.    forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'),    forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH').forward(any, State, [Event], SendTo, Status) ->    case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of	ok ->	    ?DBG("PROXY FORWARD ANY: ~p~n",[Event]),	    {noreply, State};	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel no longer exists; terminating and dropping: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	R when ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),	    {noreply, State};	R ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Terminating and dropping: ~p", 		      [?LINE, R, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, R}]),	    {stop, normal, State}    end;forward(seq, State, Event, SendTo, Status) ->    case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of	ok ->	    ?DBG("PROXY FORWARD SEQUENCE: ~p~n",[Event]),	    {noreply, State};	{'EXCEPTION', E} when record(E, 'OBJECT_NOT_EXIST') ;			      record(E, 'NO_PERMISSION') ;			      record(E, 'CosEventComm_Disconnected') ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel no longer exists; terminating and dropping: ~p", 		      [?LINE, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, {'EXCEPTION', E}}]),	    {stop, normal, State};	R when ?is_PersistentConnection(State) ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),	    {noreply, State};	R ->	    orber:dbg("[~p] PullerConsumer:forward();~n"		      "Admin/Channel respond incorrect: ~p~n"		      "Terminating and dropping: ~p",		      [?LINE, R, Event], ?DEBUG_LEVEL),	    'CosNotification_Common':notify([{proxy, State#state.this},					     {client, ?get_Client(State)}, 					     {reason, R}]),	    {stop, normal, State}    end.%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------%%--------------- END OF MODULE ------------------------------

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?