pusherconsumer_impl.erl

来自「OTP是开放电信平台的简称」· ERL 代码 · 共 729 行 · 第 1/2 页

ERL
729
字号
%%--------------------------------------------------------------------%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %%     $Id$%%%%----------------------------------------------------------------------%% File    : PusherConsumer_impl.erl%% Purpose : %% Created : 20 Oct 1999%%-----------------------------------------------------------------------module('PusherConsumer_impl').%%--------------- INCLUDES ------------------------------------include_lib("orber/include/corba.hrl").-include_lib("orber/include/ifr_types.hrl").%% cosEvent files.-include_lib("cosEvent/include/CosEventChannelAdmin.hrl").-include_lib("cosEvent/include/CosEventComm.hrl").%% Application files-include("CosNotification.hrl").-include("CosNotifyChannelAdmin.hrl").-include("CosNotifyComm.hrl").-include("CosNotifyFilter.hrl").-include("CosNotification_Definitions.hrl").%%--------------- EXPORTS ------------------------------------%%--------------- External -----------------------------------%%----- CosNotifyChannelAdmin::ProxyPushConsumer --------------export([connect_any_push_supplier/4]).%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer ------export([connect_sequence_push_supplier/4]).%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer ----export([connect_structured_push_supplier/4]).%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer -----export([obtain_subscription_types/4,	 validate_event_qos/4]).%%----- Inherit from CosNotification::QoSAdmin ----------------export([get_qos/3,	 set_qos/4,	 validate_qos/4]).%%----- Inherit from CosNotifyComm::NotifyPublish -------------export([offer_change/5]).%%----- Inherit from CosNotifyFilter::FilterAdmin -------------export([add_filter/4, 	 remove_filter/4, 	 get_filter/4,	 get_all_filters/3, 	 remove_all_filters/3]).%%----- Inherit from CosEventComm::PushConsumer --------------export([push/4,	 disconnect_push_consumer/3]).%%----- Inherit from CosNotifyComm::SequencePushConsumer -----export([push_structured_events/4,	 disconnect_sequence_push_consumer/3]).%%----- Inherit from CosNotifyComm::StructuredPushConsumer ---export([push_structured_event/4,	 disconnect_structured_push_consumer/3]).%%----- Inherit from CosEventChannelAdmin::ProxyPushConsumer-export([connect_push_supplier/4]).%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier-export(['_get_MyType'/3, 	 '_get_MyAdmin'/3]).%%--------------- gen_server specific exports -----------------export([handle_info/2, code_change/3]).-export([init/1, terminate/2]).%%--------------- LOCAL DEFINITIONS --------------------------%% Data structures-record(state, {myType,	        myAdmin,	        myAdminPid,		myChannel,		myFilters = [],		myOperator,		idCounter = 0,		client,		qosGlobal,		qosLocal,		publishType = false,		etsR,		eventDB}).%% Data structures constructors-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _EDB, _MyOP), 	#state{myType    = _MyT,	       myAdmin   = _MyA,	       myAdminPid= _MyAP,	       myChannel = _Ch,	       myOperator= _MyOP,	       qosGlobal = _QS,	       qosLocal  = _LQS,	       etsR      = ets:new(oe_ets, [set, protected]),	       eventDB   = _EDB}).%%-------------- Data structures selectors -----------------%% Attributes-define(get_MyType(S),           S#state.myType).-define(get_MyAdmin(S),          S#state.myAdmin).-define(get_MyAdminPid(S),       S#state.myAdminPid).-define(get_MyChannel(S),        S#state.myChannel).-define(get_MyOperator(S),       S#state.myOperator).%% Client Object-define(get_Client(S),           S#state.client).%% QoS-define(get_GlobalQoS(S),        S#state.qosGlobal).-define(get_LocalQoS(S),         S#state.qosLocal).-define(get_BothQoS(S),          {S#state.qosGlobal, S#state.qosLocal}).%% Filters-define(get_Filter(S, I),        find_obj(lists:keysearch(I, 1, S#state.myFilters))).-define(get_AllFilter(S),        S#state.myFilters).-define(get_AllFilterID(S),      find_ids(S#state.myFilters)).%% Publish-define(get_AllPublish(S),       lists:flatten(ets:match(S#state.etsR,							 {'$1',publish}))).-define(get_PublishType(S),      S#state.publishType).%% ID-define(get_IdCounter(S),        S#state.idCounter).%% Event-define(get_Event(S),            cosNotification_eventDB:get_event(S#state.eventDB)).-define(get_Events(S,M),         cosNotification_eventDB:get_events(S#state.eventDB, M)).-define(get_EventCounter(S),     S#state.eventCounter).%% Admin-define(get_BatchLimit(S),       ?not_GetMaximumBatchSize((S#state.qosLocal))).%%-------------- Data structures modifiers -----------------%% Client Object-define(set_Client(S,D),         S#state{client=D}).-define(del_Client(S),           S#state{client=undefined}).-define(set_Unconnected(S),      S#state{client=undefined}).%% QoS-define(set_LocalQoS(S,D),       S#state{qosLocal=D}).-define(set_GlobalQoS(S,D),      S#state{qosGlobal=D}).-define(set_BothQoS(S,GD,LD),    S#state{qosGlobal=GD, qosLocal=LD}).%% Filters-define(add_Filter(S,I,O),       S#state{myFilters=[{I,O}|S#state.myFilters]}).-define(del_Filter(S,I),         S#state{myFilters=					 delete_obj(lists:keydelete(I, 1, S#state.myFilters),						    S#state.myFilters)}).-define(del_AllFilter(S),        S#state{myFilters=[]}).%% Publish-define(add_Publish(S,E),        ets:insert(S#state.etsR, {E, publish})).-define(del_Publish(S,E),        ets:delete(S#state.etsR, E)).-define(set_PublishType(S,T),    S#state{publishType=T}).%% ID-define(set_IdCounter(S,V),      S#state{idCounter=V}).-define(new_Id(S),               'CosNotification_Common':create_id(S#state.idCounter)).%% Event-define(add_Event(S,E),          cosNotification_eventDB:add_event(S#state.eventDB, E)).-define(update_EventDB(S,Q),     S#state{eventDB=					 cosNotification_eventDB:update(S#state.eventDB, Q)}).-define(set_EventCounter(S,V),   S#state{eventCounter=V}).-define(add_to_EventCounter(S,V),S#state{eventCounter=S#state.eventCounter+V}).-define(reset_EventCounter(S),   S#state{eventCounter=0}).-define(increase_EventCounter(S),S#state{eventCounter=(S#state.eventCounter+1)}).-define(decrease_EventCounter(S),S#state{eventCounter=S#state.eventCounter-1}).-define(add_ToEventCounter(S,A), S#state{eventCounter=(S#state.eventCounter+A)}).-define(sub_FromEventCounter(S,_A), S#state{eventCounter=(S#state.eventCounter-_A)}).-define(set_EventCounterTo(S,V), S#state{eventCounter=V}).%% MISC-define(is_ANY(S),               S#state.myType == 'PUSH_ANY').-define(is_STRUCTURED(S),        S#state.myType == 'PUSH_STRUCTURED').-define(is_SEQUENCE(S),          S#state.myType == 'PUSH_SEQUENCE').-define(is_ANDOP(S),             S#state.myOperator == 'AND_OP').-define(is_UnConnected(S),       S#state.client == undefined).-define(is_Connected(S),         S#state.client =/= undefined).-define(is_PersistentConnection(S),	?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent).-define(is_PersistentEvent(S),	?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent).-define(is_BatchLimitReached(S), S#state.eventCounter >=                                  ?not_GetMaximumBatchSize((S#state.qosLocal))).%%----------------------------------------------------------%%% function : handle_info, code_change%% Arguments: %% Returns  : %% Effect   : Functions demanded by the gen_server module. %%-----------------------------------------------------------code_change(_OldVsn, State, _Extra) ->    {ok, State}.handle_info(Info, State) ->    ?DBG("INFO: ~p~n", [Info]),    case Info of        {'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid->            ?DBG("PARENT ADMIN: ~p  TERMINATED.~n",[Reason]),	    {stop, Reason, State};        {'EXIT', _Pid, _Reason} ->            ?DBG("PROXYPUSHCONSUMER: ~p  TERMINATED.~n",[_Reason]),            {noreply, State};        _ ->            {noreply, State}    end.%%----------------------------------------------------------%%% function : init, terminate%% Arguments: %%-----------------------------------------------------------init(['PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS,       MyChannel, Options, Operator]) ->    process_flag(trap_exit, true),    %% Only if MyType is 'PUSH_SEQUENCE' we need an ets to store events in.    %% Otherwise we'll forward them at once. Why? We don't know when the next event    %% is due.    GCTime = 'CosNotification_Common':get_option(gcTime, Options, 						 ?not_DEFAULT_SETTINGS),    GCLimit = 'CosNotification_Common':get_option(gcTime, Options, 						  ?not_DEFAULT_SETTINGS),    TimeRef = 'CosNotification_Common':get_option(timeService, Options, 						  ?not_DEFAULT_SETTINGS),    {ok, ?get_InitState('PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel,			cosNotification_eventDB:create_db(LQS, GCTime, GCLimit, TimeRef), 			Operator)};init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, _Options, Operator]) ->    process_flag(trap_exit, true),    {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, 			undefined, Operator)}.terminate(_Reason, State) when ?is_UnConnected(State) ->    ok;terminate(_Reason, State) ->    Client = ?get_Client(State),    case catch corba_object:is_nil(Client) of	false when ?is_ANY(State) ->	    'CosNotification_Common':disconnect('CosEventComm_PushSupplier', 						disconnect_push_supplier,						Client);	false when ?is_SEQUENCE(State) ->	    'CosNotification_Common':disconnect('CosNotifyComm_SequencePushSupplier',						disconnect_sequence_push_supplier,						Client);	false when ?is_STRUCTURED(State) ->	    'CosNotification_Common':disconnect('CosNotifyComm_StructuredPushSupplier',						disconnect_structured_push_supplier,						Client);	_ ->	    ok    end.%%-----------------------------------------------------------%%----- CosNotifyChannelAdmin_ProxyConsumer attributes ------%%-----------------------------------------------------------%%----------------------------------------------------------%%% Attribute: '_get_MyType'%% Type     : readonly%% Returns  : %%-----------------------------------------------------------'_get_MyType'(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_MyType(State), State}.%%----------------------------------------------------------%%% Attribute: '_get_MyAdmin'%% Type     : readonly%% Returns  : %%-----------------------------------------------------------'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) ->    {reply, ?get_MyAdmin(State), State}.%%-----------------------------------------------------------%%------- Exported external functions -----------------------%%-----------------------------------------------------------%%----- CosEventChannelAdmin::ProxyPushConsumer -------------%%----------------------------------------------------------%%% function : connect_push_supplier%% Arguments: Client - CosEventComm::PushSupplier%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%            Both exceptions from CosEventChannelAdmin!!!%%-----------------------------------------------------------connect_push_supplier(OE_THIS, OE_FROM, State, Client) ->    connect_any_push_supplier(OE_THIS, OE_FROM, State, Client).%%----- CosNotifyChannelAdmin::ProxyPushConsumer ------------%%----------------------------------------------------------%%% function : connect_any_push_supplier%% Arguments: Client - CosEventComm::PushSupplier%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%            Both exceptions from CosEventChannelAdmin!!!%%-----------------------------------------------------------connect_any_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) ->    ?not_TypeCheck(Client, 'CosEventComm_PushSupplier'),    if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    {reply, ok, ?set_Client(State, Client)}    end;connect_any_push_supplier(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer ----%%----------------------------------------------------------%%% function : connect_sequence_push_supplier%% Arguments: Client - CosNotifyComm::SequencePushSupplier%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%-----------------------------------------------------------connect_sequence_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) ->    ?not_TypeCheck(Client, 'CosNotifyComm_SequencePushSupplier'),     if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    {reply, ok, ?set_Client(State, Client)}    end;connect_sequence_push_supplier(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer --%%----------------------------------------------------------%%% function : connect_structured_push_supplier%% Arguments: Client - CosNotifyComm::StructuredPushSupplier%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |%%            {'EXCEPTION', #'TypeError'{}}%%-----------------------------------------------------------connect_structured_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) ->    ?not_TypeCheck(Client, 'CosNotifyComm_StructuredPushSupplier'),     if	?is_Connected(State) ->	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});	true ->	    {reply, ok, ?set_Client(State, Client)}    end;connect_structured_push_supplier(_, _, _, _) ->    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ---%%----------------------------------------------------------%%% function : obtain_subscription_types%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin)%% Returns  : CosNotification::EventTypeSeq

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?