pushersupplier_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 1,052 行 · 第 1/3 页

ERL
1,052
字号
    {reply, ok, ?set_PrioFil(State, PrioF)}.%%----------------------------------------------------------%%% Attribute: '_*et_lifetime_filter'%% Type     : read/write%% Returns  : %%-----------------------------------------------------------'_get_lifetime_filter'(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_LifeTFil(State), State}.'_set_lifetime_filter'(_OE_THIS, _OE_FROM, State, LifeTF) ->    {reply, ok, ?set_LifeTFil(State, LifeTF)}.%%-----------------------------------------------------------%%------- Exported external functions -----------------------%%-----------------------------------------------------------%%----- CosEventChannelAdmin::ProxyPushSupplier -------------%%----------------------------------------------------------%%% function : connect_push_consumer%% Arguments: Client - CosEventComm::PushConsumer%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%            Both exceptions from CosEventChannelAdmin!!!!%%-----------------------------------------------------------connect_push_consumer(OE_THIS, OE_FROM, State, Client) ->    connect_any_push_consumer(OE_THIS, OE_FROM, State, Client).%%----- CosNotifyChannelAdmin::ProxyPushSupplier ------------%%----------------------------------------------------------%%% function : connect_any_push_consumer%% Arguments: Client - CosEventComm::PushConsumer%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%            Both exceptions from CosEventChannelAdmin!!!!%%-----------------------------------------------------------connect_any_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) ->    'CosNotification_Common':type_check(Client, 'CosEventComm_PushConsumer'),    if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    {reply, ok, State#state{client = Client, this = OE_THIS}}    end;connect_any_push_consumer(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- CosNotifyChannelAdmin::SequenceProxyPushSupplier ----%%----------------------------------------------------------%%% function : connect_sequence_push_consumer%% Arguments: Client - CosNotifyComm::SequencePushConsumer%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%-----------------------------------------------------------connect_sequence_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) ->    'CosNotification_Common':type_check(Client, 					'CosNotifyComm_SequencePushConsumer'),    if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    NewState = start_timer(State),	    {reply, ok, NewState#state{client = Client, this = OE_THIS}}    end;connect_sequence_push_consumer(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- CosNotifyChannelAdmin::StructuredProxyPushSupplier ---%%----------------------------------------------------------%%% function : connect_structured_push_consumer%% Arguments: Client - CosNotifyComm::StructuredPushConsumer%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%-----------------------------------------------------------connect_structured_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) ->    'CosNotification_Common':type_check(Client, 					'CosNotifyComm_StructuredPushConsumer'),    if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    {reply, ok, State#state{client = Client, this = OE_THIS}}    end;connect_structured_push_consumer(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- CosNotifyChannelAdmin::*ProxyPushSupplier -----------%%----------------------------------------------------------%%% function : suspend_connection%% Arguments: %% Returns  : ok | {'EXCEPTION', #'ConnectionAlreadyInactive'{}} |%%            {'EXCEPTION', #'NotConneced'{}}%%-----------------------------------------------------------suspend_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) ->    if	?is_Suspended(State) ->	    corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyInactive'{});	true ->	    stop_timer(State#state.pacingTimer),	    {reply, ok, State#state{pacingTimer = undefined,				    suspended=true}}    end;suspend_connection(_,_,_)->      corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}).  %%----------------------------------------------------------%%% function : resume_connection%% Arguments: %% Returns  :  ok | {'EXCEPTION', #'ConnectionAlreadyActive'{}} |%%            {'EXCEPTION', #'NotConneced'{}}%%-----------------------------------------------------------resume_connection(_OE_THIS, OE_FROM, State) when ?is_Connected(State) ->    if	?is_NotSuspended(State) ->	    corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyActive'{});	true ->	    corba:reply(OE_FROM, ok),	    if		?is_SEQUENCE(State) ->		    start_timer(State);		true ->		    ok	    end,	    lookup_and_push(?set_NotSuspended(State))    end;resume_connection(_,_,_) ->    corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}).%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier ---%%----------------------------------------------------------%%% function : obtain_offered_types%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin)%% Returns  : CosNotification::EventTypeSeq%%-----------------------------------------------------------obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') ->    {reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, false)};obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') ->    {reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, true)};obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') ->    {reply, [], ?set_SubscribeType(State, false)};obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') ->    {reply, [], ?set_SubscribeType(State, true)};obtain_offered_types(_,_,_,What) ->    orber:dbg("[~p] PusherSupplier:obtain_offered_types(~p);~n"	      "Bad enumerant", [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : validate_event_qos%% Arguments: RequiredQoS - CosNotification::QoSProperties%% Returns  : ok | {'EXCEPTION', #'UnsupportedQoS'{}}%%            AvilableQoS - CosNotification::NamedPropertyRangeSeq (out)%%-----------------------------------------------------------validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) ->    AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS,							      ?get_LocalQoS(State)),    {reply, {ok, AvilableQoS}, State}.%%----- Inherit from CosNotification::QoSAdmin --------------%%----------------------------------------------------------%%% function : get_qos%% Arguments: %% Returns  : %%-----------------------------------------------------------get_qos(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_GlobalQoS(State), State}.    %%----------------------------------------------------------%%% function : set_qos%% Arguments: QoS - CosNotification::QoSProperties, i.e.,%%            [#'Property'{name, value}, ...] where name eq. string()%%            and value eq. any().%% Returns  : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}%%-----------------------------------------------------------set_qos(_OE_THIS, _OE_FROM, State, QoS) ->    {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), 						     proxy, ?get_MyAdmin(State), 						     false),    NewState = ?update_EventDB(State, LQS),    {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}.%%----------------------------------------------------------%%% function : validate_qos%% Arguments: Required_qos - CosNotification::QoSProperties%%            [#'Property'{name, value}, ...] where name eq. string()%%            and value eq. any().%% Returns  : {'EXCEPTION', CosNotification::UnsupportedQoS}%%            {ok, CosNotification::NamedPropertyRangeSeq}%%-----------------------------------------------------------validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) ->    QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), 						proxy, ?get_MyAdmin(State), 						false),    {reply, {ok, QoS}, State}.%%----- Inherit from CosNotifyComm::NotifySubscribe ---------%%----------------------------------------------------------%%% function : subscription_change%% Arguments: Added - #'CosNotification_EventType'{}%%            Removed - #'CosNotification_EventType'{}%% Returns  : ok | %%            {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}}%%-----------------------------------------------------------subscription_change(_OE_THIS, _OE_FROM, State, Added, Removed) ->    cosNotification_Filter:validate_types(Added),     cosNotification_Filter:validate_types(Removed),    %% On this "side", we care about which type of events the client     %% will require, since the client (or an agent) clearly stated    %% that it's only interested in these types of events.    %% Also see PusherConsumer- and PullerConsumer-'offer_change'.    update_subscribe(remove, State, Removed),    CurrentSub = ?get_AllSubscribe(State),    NewState = 	case cosNotification_Filter:check_types(Added++CurrentSub) of	    true ->		%% Types supplied does in some way cause all events to be valid.		%% Smart? Would have been better to not supply any at all.		?set_SubscribeData(State, true);	    {ok, Which, WC} ->		?set_SubscribeData(State, {Which, WC})    end,    update_subscribe(add, NewState, Added),    case ?get_SubscribeType(NewState) of	true ->	    %% Perhaps we should handle exception here?!	    %% Probably not. Better to stay "on-line".	    catch 'CosNotifyComm_NotifyPublish':		offer_change(?get_Client(NewState), Added, Removed),	    ok;	_->	    ok    end,	    {reply, ok, NewState}.update_subscribe(_, _, [])->    ok;update_subscribe(add, State, [H|T]) ->    ?add_Subscribe(State, H),    update_subscribe(add, State, T);update_subscribe(remove, State, [H|T]) ->    ?del_Subscribe(State, H),    update_subscribe(remove, State, T).%%----- Inherit from CosNotifyFilter::FilterAdmin -----------%%----------------------------------------------------------%%% function : add_filter%% Arguments: Filter - CosNotifyFilter::Filter%% Returns  : FilterID - long%%-----------------------------------------------------------add_filter(_OE_THIS, _OE_FROM, State, Filter) ->    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),    FilterID = ?new_Id(State),    NewState = ?set_IdCounter(State, FilterID),    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.%%----------------------------------------------------------%%% function : remove_filter%% Arguments: FilterID - long%% Returns  : ok%%-----------------------------------------------------------remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ok, ?del_Filter(State, FilterID)};remove_filter(_,_,_,What) ->    orber:dbg("[~p] PusherSupplier:remove_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_filter%% Arguments: FilterID - long%% Returns  : Filter - CosNotifyFilter::Filter |%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}%%-----------------------------------------------------------get_filter(_OE_THIS, _OE_FROM, State, FilterID) when integer(FilterID) ->    {reply, ?get_Filter(State, FilterID), State};get_filter(_,_,_,What) ->    orber:dbg("[~p] PusherSupplier:get_filter(~p); Not an integer", 	      [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).%%----------------------------------------------------------%%% function : get_all_filters%% Arguments: -%% Returns  : Filter - CosNotifyFilter::FilterIDSeq%%-----------------------------------------------------------get_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_AllFilterID(State), State}.%%----------------------------------------------------------%%% function : remove_all_filters%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------remove_all_filters(_OE_THIS, _OE_FROM, State) ->    {reply, ok, ?del_AllFilter(State)}.%%----- Inherit from CosEventComm::PushSupplier -------------%%----------------------------------------------------------%%% function : disconnect_push_supplier%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_push_supplier(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::StructuredPushSupplier --%%----------------------------------------------------------%%% function : disconnect_structured_push_supplier%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_structured_push_supplier(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%----- Inherit from CosNotifyComm::SequencePushSupplier ----%%----------------------------------------------------------%%% function : disconnect_sequence_push_supplier%% Arguments: -%% Returns  : ok%%-----------------------------------------------------------disconnect_sequence_push_supplier(_OE_THIS, _OE_FROM, State) ->    {stop, normal, ok, ?set_Unconnected(State)}.%%--------------- LOCAL FUNCTIONS ----------------------------find_obj({value, {_, Obj}}) -> Obj;find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.find_ids(List) ->           find_ids(List, []).find_ids([], Acc) ->        Acc;find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);find_ids(What, _) ->     orber:dbg("[~p] PusherSupplier:find_ids();~n"	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).%% Delete a single object.%% The list do not differ, i.e., no filter removed, raise exception.delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});delete_obj(List,_) -> List.%%-----------------------------------------------------------%% function : callSeq%% Arguments: %% Returns  : %%-----------------------------------------------------------callSeq(_OE_THIS, OE_FROM, State, EventsIn, Status) ->    corba:reply(OE_FROM, ok),    case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventsIn,						?get_AllFilter(State),						?get_SubscribeDB(State),						Status) of	{[],_} ->

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?