📄 genericpeergroup.java
字号:
platformConfig.setPeerID(IDFactory.newPeerID((PeerGroupID) assignedID)); } } peerAdvertisement.setPeerID(platformConfig.getPeerID()); peerAdvertisement.setName(platformConfig.getName()); peerAdvertisement.setDesc(platformConfig.getDesc()); } else { if (null == parentGroup) { // If we did not get a valid peer id, we'll initialize it here. peerAdvertisement.setPeerID(IDFactory.newPeerID((PeerGroupID) assignedID)); } else { // We're not the world peer group, which is the authoritative source of these values. peerAdvertisement.setPeerID(parentGroup.getPeerAdvertisement().getPeerID()); peerAdvertisement.setName(parentGroup.getPeerAdvertisement().getName()); peerAdvertisement.setDesc(parentGroup.getPeerAdvertisement().getDesc()); } } if (peerGroupAdvertisement == null) { // No existing gadv. OK then we're creating the group or we're // the platform, it seems. Start a grp adv with the essentials // that we know. peerGroupAdvertisement = (PeerGroupAdvertisement) AdvertisementFactory.newAdvertisement(PeerGroupAdvertisement.getAdvertisementType()); peerGroupAdvertisement.setPeerGroupID((PeerGroupID) assignedID); peerGroupAdvertisement.setModuleSpecID(implAdvertisement.getModuleSpecID()); } else { published = true; } // If we still do not have a config adv, make one with the minimal info in it. // All groups but the Platform and the netPG are in that case. // In theory a plain ConfigParams would be enough for subgroups. But for now // GenericPeerGroup always has a full Platformconfig and there is no other concrete // ConfigParams subclass. if (configAdvertisement == null) { PlatformConfig conf = (PlatformConfig) AdvertisementFactory.newAdvertisement(PlatformConfig.getAdvertisementType()); conf.setPeerID(peerAdvertisement.getPeerID()); conf.setName(peerAdvertisement.getName()); conf.setDesc(peerAdvertisement.getDesc()); configAdvertisement = conf; } // Merge service params with those specified by the group (if any). The only // policy, right now, is to give peer params the precedence over group params. Hashtable grpParams = peerGroupAdvertisement.getServiceParams(); Enumeration keys = grpParams.keys(); while (keys.hasMoreElements()) { ID key = (ID) keys.nextElement(); Element e = (Element) grpParams.get(key); if (configAdvertisement.getServiceParam(key) == null) { configAdvertisement.putServiceParam(key, e); } } /* * Now seems like the right time to attempt to register the group. * The only trouble is that it could cause the group to * be used before all the services are initialized, but on the * other hand, we do not want to let a redundant group go through * it's service initialization because that would cause irreparable * damage to the legitimate instance. There should be a synchro on * on the get<service>() and lookupService() routines. */ if (!globalRegistry.registerInstance((PeerGroupID) assignedID, this)) { throw new PeerGroupException("Group already instantiated"); } } catch (Throwable any) { if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) { LOG.log(Level.SEVERE, "Group init failed", any); } if (any instanceof Error) { throw (Error) any; } else if (any instanceof RuntimeException) { throw (RuntimeException) any; } else if (any instanceof PeerGroupException) { throw (PeerGroupException) any; } throw new PeerGroupException("Group init failed", any); } ThreadGroup parentThreadGroup = (null != this.parentGroup) ? parentGroup.getHomeThreadGroup() : Thread.currentThread().getThreadGroup(); threadGroup = new ThreadGroup(parentThreadGroup, "Group " + peerGroupAdvertisement.getPeerGroupID()); taskQueue = new ArrayBlockingQueue<Runnable>(COREPOOLSIZE * 2); threadPool = new ThreadPoolExecutor(COREPOOLSIZE, MAXPOOLSIZE, KEEPALIVETIME, TimeUnit.SECONDS, taskQueue, new PeerGroupThreadFactory("Executor", getHomeThreadGroup()), new CallerBlocksPolicy()); // Try to allow core threads to idle out. (Requires a 1.6 method) try { Method allowCoreThreadTimeOut = threadPool.getClass().getMethod("allowCoreThreadTimeOut", boolean.class); allowCoreThreadTimeOut.invoke(threadPool, Boolean.TRUE); } catch(Throwable ohWell) { // Our attempt failed. if (Logging.SHOW_FINEST && LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, "Failed to enable 'allowCoreThreadTimeOut'", ohWell); } } scheduledExecutor = new ScheduledThreadPoolExecutor(1, new PeerGroupThreadFactory("Scheduled Executor", getHomeThreadGroup())); /* * The rest of construction and initialization are left to the * group subclass, between here and the begining for initLast. * That should include instanciating and setting the endpoint, and * finally supplying it with endpoint protocols. * That also includes instanciating the appropriate services * and registering them. * For an example, see the StdPeerGroup class. */ } /** * Perform all initialization steps that need to be performed * after any subclass initialization is performed. * <p/> * Classes that override this method should always call super.initLast * <strong>after</strong> doing any of their own work. * @throws PeerGroupException if a group initialization error occurs */ protected void initLast() throws PeerGroupException { if (Logging.SHOW_CONFIG && LOG.isLoggable(Level.CONFIG)) { StringBuilder configInfo = new StringBuilder("Configuring Group : " + getPeerGroupID()); if (implAdvertisement != null) { configInfo.append("\n\tImplementation :"); configInfo.append("\n\t\tModule Spec ID: ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tImpl Description : ").append(implAdvertisement.getDescription()); configInfo.append("\n\t\tImpl URI : ").append(implAdvertisement.getUri()); configInfo.append("\n\t\tImpl Code : ").append(implAdvertisement.getCode()); } configInfo.append("\n\tGroup Params :"); configInfo.append("\n\t\tModule Spec ID : ").append(implAdvertisement.getModuleSpecID()); configInfo.append("\n\t\tPeer Group ID : ").append(getPeerGroupID()); configInfo.append("\n\t\tGroup Name : ").append(getPeerGroupName()); configInfo.append("\n\t\tPeer ID in Group : ").append(getPeerID()); configInfo.append("\n\tConfiguration :"); if (null == parentGroup) { configInfo.append("\n\t\tHome Group : (none)"); } else { configInfo.append("\n\t\tHome Group : \"").append(parentGroup.getPeerGroupName()).append("\" / ").append( parentGroup.getPeerGroupID()); } configInfo.append("\n\t\tServices :"); for (Map.Entry<ModuleClassID, Service> anEntry : services.entrySet()) { ModuleClassID aMCID = anEntry.getKey(); ModuleImplAdvertisement anImplAdv = (ModuleImplAdvertisement) anEntry.getValue().getImplAdvertisement(); configInfo.append("\n\t\t\t").append(aMCID).append("\t").append(anImplAdv.getDescription()); } LOG.config(configInfo.toString()); } } /** * {@inheritDoc} */ public int startApp(String[] arg) { return Module.START_OK; } /** * {@inheritDoc} * <p/> * PeerGroupInterface's stopApp() does nothing. Only a real reference to the * group object permits to stop it without going through ref counting. */ public void stopApp() { stopping = true; Collection<ModuleClassID> allServices = new ArrayList<ModuleClassID>(services.keySet()); // Stop and remove all remaining services. for (ModuleClassID aService : allServices) { try { removeService(aService); } catch (Exception failure) { LOG.log(Level.WARNING, "Failure shutting down service : " + aService, failure); } } if (!services.isEmpty()) { LOG.warning(services.size() + " services could not be shut down during peer group stop."); } // remove everything (just in case); services.clear(); globalRegistry.unRegisterInstance(peerGroupAdvertisement.getPeerGroupID(), this); // Explicitly unreference our parent group in order to allow it // to terminate if this group object was itself the last reference // to it. if (parentGroup != null) { parentGroup.unref(); parentGroup = null; } // shutdown the threadpool threadPool.shutdownNow(); scheduledExecutor.shutdownNow(); // No longer initialized. initComplete = false; } /** * {@inheritDoc} * <p/> * May be called by a module which has a direct reference to the group * object and wants to notify its abandoning it. Has no effect on the real * group object. */ public void unref() {} /** * Called every time an interface object that refers to this group * goes away, either by being finalized or by its unref() method being * invoked explicitly. */ protected void decRefCount() { synchronized (this) { --masterRefCount; if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { Throwable trace = new Throwable("Stack Trace"); StackTraceElement elements[] = trace.getStackTrace(); LOG.info("[" + getPeerGroupID() + "] GROUP REF COUNT DECCREMENTED TO: " + masterRefCount + " by\n\t" + elements[2]); } if (masterRefCount != 0) { return; } if (!stopWhenUnreferenced) { return; } } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { LOG.info("[" + getPeerGroupID() + "] STOPPING UNREFERENCED GROUP"); } stopApp(); masterRefCount = Integer.MIN_VALUE; } /* * Implement the Service API so that we can make groups services when we * decide to. */ /** * {@inheritDoc} */ public Service getInterface() { synchronized (this) { ++masterRefCount; if (masterRefCount < 1) { throw new IllegalStateException("Group has been shutdown. getInterface() is not available"); } if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) { Throwable trace = new Throwable("Stack Trace"); StackTraceElement elements[] = trace.getStackTrace(); LOG.info("[" + getPeerGroupID() + "] GROUP REF COUNT INCREMENTED TO: " + masterRefCount + " by\n\t" + elements[2]); } if (initComplete) { // If init is complete the group can become sensitive to // its ref count reaching zero. Before there could be // transient references before there is a chance to give // a permanent reference to the invoker of newGroup. stopWhenUnreferenced = true; } } return new RefCountPeerGroupInterface(this); } /** * {@inheritDoc} */ public PeerGroup getWeakInterface() { return new PeerGroupInterface(this); } /** * {@inheritDoc} */ public ModuleImplAdvertisement getImplAdvertisement() { return implAdvertisement.clone(); } /** * {@inheritDoc} */ public void publishGroup(String name, String description) throws IOException { if (published) { return; } peerGroupAdvertisement.setName(name); peerGroupAdvertisement.setDescription(description); if (parentGroup == null) { return; } DiscoveryService parentDiscovery = parentGroup.getDiscoveryService(); if (null == parentDiscovery) { return; } parentDiscovery.publish(peerGroupAdvertisement, DEFAULT_LIFETIME, DEFAULT_EXPIRATION); published = true; } /** * {@inheritDoc} */ public PeerGroup newGroup(Advertisement pgAdv) throws PeerGroupException { PeerGroupAdvertisement adv = (PeerGroupAdvertisement) pgAdv; PeerGroupID gid = adv.getPeerGroupID();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -