pullersupplier_impl.erl
来自「OTP是开放电信平台的简称」· ERL 代码 · 共 914 行 · 第 1/3 页
ERL
914 行
%%--------------------------------------------------------------------%% ``The contents of this file are subject to the Erlang Public License,%% Version 1.1, (the "License"); you may not use this file except in%% compliance with the License. You should have received a copy of the%% Erlang Public License along with this software. If not, it can be%% retrieved via the world wide web at http://www.erlang.org/.%% %% Software distributed under the License is distributed on an "AS IS"%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See%% the License for the specific language governing rights and limitations%% under the License.%% %% The Initial Developer of the Original Code is Ericsson Utvecklings AB.%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings%% AB. All Rights Reserved.''%% %% $Id$%%%%----------------------------------------------------------------------%% File : PullerSupplier_impl.erl%% Purpose : %% Created : 20 Oct 1999%%-----------------------------------------------------------------------module('PullerSupplier_impl').%%--------------- INCLUDES ------------------------------------include_lib("orber/include/corba.hrl").-include_lib("orber/include/ifr_types.hrl").%% cosEvent files.-include_lib("cosEvent/include/CosEventChannelAdmin.hrl").%% Application files-include("CosNotification.hrl").-include("CosNotifyChannelAdmin.hrl").-include("CosNotifyComm.hrl").-include("CosNotifyFilter.hrl").-include("CosNotification_Definitions.hrl").%%--------------- EXPORTS ------------------------------------%%--------------- External -----------------------------------%%----- CosNotifyChannelAdmin::ProxyPullSupplier --------------export([connect_any_pull_consumer/4]).%%----- CosNotifyChannelAdmin::SequenceProxyPullSupplier ------export([connect_sequence_pull_consumer/4]).%%----- CosNotifyChannelAdmin::StructuredProxyPullSupplier ----export([connect_structured_pull_consumer/4]).%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier -----export([obtain_offered_types/4, validate_event_qos/4]).%%----- Inherit from CosNotification::QoSAdmin ----------------export([get_qos/3, set_qos/4, validate_qos/4]).%%----- Inherit from CosNotifyComm::NotifySubscribe -----------export([subscription_change/5]).%%----- Inherit from CosNotifyFilter::FilterAdmin -------------export([add_filter/4, remove_filter/4, get_filter/4, get_all_filters/3, remove_all_filters/3]).%%----- Inherit from CosEventComm::PullSupplier --------------export([pull/3, try_pull/3, disconnect_pull_supplier/3]).%%----- Inherit from CosNotifyComm::SequencePullSupplier ---export([pull_structured_events/4, try_pull_structured_events/4, disconnect_sequence_pull_supplier/3]).%%----- Inherit from CosNotifyComm::StructuredPullSupplier ---export([pull_structured_event/3, try_pull_structured_event/3, disconnect_structured_pull_supplier/3]).%%----- Inherit from CosEventChannelAdmin::ProxyPullSupplier-export([connect_pull_consumer/4]).%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier-export(['_get_MyType'/3, '_get_MyAdmin'/3, '_get_priority_filter'/3, '_set_priority_filter'/4, '_get_lifetime_filter'/3, '_set_lifetime_filter'/4]).%%--------------- Internal -----------------------------------%%----- Inherit from cosNotificationComm ---------------------export([callAny/5, callSeq/5]).%%--------------- gen_server specific exports -----------------export([handle_info/2, code_change/3]).-export([init/1, terminate/2]).%%--------------- LOCAL DEFINITIONS --------------------------%% Data structures-record(state, {myType, myAdmin, myAdminPid, myChannel, myFilters = [], myOperator, idCounter = 0, prioFil, lifetFil, client, qosGlobal, qosLocal, pacingTimer, respondTo, subscribeType = false, subscribeData = true, etsR, eventDB}).%% Data structures constructors-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _MyOp, _GT, _GL, _TR), #state{myType = _MyT, myAdmin = _MyA, myAdminPid= _MyAP, myChannel = _Ch, myOperator= _MyOp, qosGlobal = _QS, qosLocal = _LQS, etsR = ets:new(oe_ets, [set, protected]), eventDB = cosNotification_eventDB:create_db(_LQS, _GT, _GL, _TR)}).%% Data structures selectors%% Attributes-define(get_MyType(S), S#state.myType).-define(get_MyAdmin(S), S#state.myAdmin).-define(get_MyAdminPid(S), S#state.myAdmin).-define(get_MyChannel(S), S#state.myChannel).-define(get_MyOperator(S), S#state.myOperator).-define(get_PrioFil(S), S#state.prioFil).-define(get_LifeTFil(S), S#state.lifetFil).%% Client Object-define(get_Client(S), S#state.client).%% QoS-define(get_GlobalQoS(S), S#state.qosGlobal).-define(get_LocalQoS(S), S#state.qosLocal).-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}).%% Filters-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters))).-define(get_AllFilter(S), S#state.myFilters).-define(get_AllFilterID(S), find_ids(S#state.myFilters)).%% Event-define(get_Event(S), cosNotification_eventDB:get_event(S#state.eventDB)).-define(get_Events(S,M), cosNotification_eventDB:get_events(S#state.eventDB, M)).-define(get_RespondTo(S), S#state.respondTo).%% Amin-define(get_PacingTimer(S), S#state.pacingTimer).-define(get_PacingInterval(S), round(?not_GetPacingInterval((S#state.qosLocal))/10000000)).-define(get_BatchLimit(S), ?not_GetMaximumBatchSize((S#state.qosLocal))).%% Subscribe-define(get_AllSubscribe(S), lists:flatten(ets:match(S#state.etsR, {'$1',subscribe}))).-define(get_SubscribeType(S), S#state.subscribeType).-define(get_SubscribeData(S), S#state.subscribeData).%% ID-define(get_IdCounter(S), S#state.idCounter).-define(get_SubscribeDB(S), S#state.etsR).%% Data structures modifiers%% Attributes-define(set_PrioFil(S,D), S#state{prioFil=D}).-define(set_LifeTFil(S,D), S#state{lifetFil=D}).%% Client Object-define(set_Client(S,D), S#state{client=D}).-define(del_Client(S), S#state{client=undefined}).%% QoS-define(set_LocalQoS(S,D), S#state{qosLocal=D}).-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}).-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}).-define(update_EventDB(S,Q), S#state{eventDB= cosNotification_eventDB:update(S#state.eventDB, Q)}).%% Filters-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}).-define(del_Filter(S,I), S#state{myFilters= delete_obj(lists:keydelete(I, 1, S#state.myFilters), S#state.myFilters)}).-define(del_AllFilter(S), S#state{myFilters=[]}).-define(set_Unconnected(S), S#state{client=undefined}).-define(reset_RespondTo(S), S#state{respondTo=undefined}).-define(set_RespondTo(S,F), S#state{respondTo=F}).%% Event-define(add_Event(S,E), catch cosNotification_eventDB: add_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil)).-define(addAndGet_Event(S,E), catch cosNotification_eventDB: add_and_get_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil)).%% Admin-define(set_PacingTimer(S,T), S#state{pacingTimer=T}).%% Subscribe-define(add_Subscribe(S,E), ets:insert(S#state.etsR, {E, subscribe})).-define(del_Subscribe(S,E), ets:delete(S#state.etsR, E)).-define(set_SubscribeType(S,T), S#state{subscribeType=T}).-define(set_SubscribeData(S,D), S#state{subscribeData=D}).%% ID-define(set_IdCounter(S,V), S#state{idCounter=V}).-define(new_Id(S), 'CosNotification_Common':create_id(S#state.idCounter)).%% MISC-define(is_ANY(S), S#state.myType == 'PULL_ANY').-define(is_STRUCTURED(S), S#state.myType == 'PULL_STRUCTURED').-define(is_SEQUENCE(S), S#state.myType == 'PULL_SEQUENCE').-define(is_ANDOP(S), S#state.myOperator == 'AND_OP').-define(is_UnConnected(S), S#state.client == undefined).-define(is_Connected(S), S#state.client =/= undefined).-define(is_Waiting(S), S#state.respondTo =/= undefined).-define(is_SubscribedFor(S,K), ets:lookup(S#state.etsR, K) =/= []).-define(is_BatchLimitReached(S,M), cosNotification_eventDB: status(S#state.eventDB, {batchLimit, ?not_GetMaximumBatchSize((S#state.qosLocal)), M})).%%----------------------------------------------------------%%% function : handle_info, code_change%% Arguments: %% Returns : %% Effect : Functions demanded by the gen_server module. %%-----------------------------------------------------------code_change(_OldVsn, State, _Extra) -> {ok, State}.handle_info(Info, State) -> ?DBG("INFO: ~p~n", [Info]), case Info of {'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid-> ?DBG("PARENT ADMIN: ~p TERMINATED.~n",[Reason]), {stop, Reason, State}; {'EXIT', _Pid, _Reason} -> ?DBG("PROXYPUSHSUPPLIER: ~p TERMINATED.~n",[_Reason]), {noreply, State}; {pacing, TS} when ?is_Waiting(State) -> case ?get_PacingTimer(State) of {_, TS} -> ?DBG("PULL SUPPLIER PACING LIMIT REACHED~n",[]), {RespondTo, Max} = ?get_RespondTo(State), {EventSeq, _} = ?get_Events(State, Max), corba:reply(RespondTo, EventSeq), {noreply, ?reset_RespondTo(State)}; _ -> %% Must have been an old timer event, i.e., we reached the %% Batch Limit before Pace limit and we were not able %% to stop the timer before it triggered an event. ?DBG("PULL SUPPLIER OLD PACING LIMIT REACHED~n",[]), {noreply, State} end; {pacing, _} -> ?DBG("PULL SUPPLIER PACING LIMIT REACHED BUT NO CLIENT; IMPOSSIBLE!!!~n",[]), {noreply, State}; _ -> {noreply, State} end.%%----------------------------------------------------------%%% function : init, terminate%% Arguments: %%-----------------------------------------------------------init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Options, Operator]) -> process_flag(trap_exit, true), GCTime = 'CosNotification_Common':get_option(gcTime, Options, ?not_DEFAULT_SETTINGS), GCLimit = 'CosNotification_Common':get_option(gcTime, Options, ?not_DEFAULT_SETTINGS), TimeRef = 'CosNotification_Common':get_option(timeService, Options, ?not_DEFAULT_SETTINGS), {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Operator, GCTime, GCLimit, TimeRef)}.terminate(_Reason, State) when ?is_UnConnected(State) -> ok;terminate(_Reason, State) -> Client = ?get_Client(State), case catch corba_object:is_nil(Client) of false when ?is_ANY(State) -> 'CosNotification_Common':disconnect('CosEventComm_PullConsumer', disconnect_pull_consumer, Client); false when ?is_SEQUENCE(State) -> 'CosNotification_Common':disconnect('CosNotifyComm_SequencePullConsumer', disconnect_sequence_pull_consumer, Client); false when ?is_STRUCTURED(State) -> 'CosNotification_Common':disconnect('CosNotifyComm_StructuredPullConsumer', disconnect_structured_pull_consumer, Client); _ -> ok end.%%-----------------------------------------------------------%%----- CosNotifyChannelAdmin_ProxySupplier attributes ------
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?