📄 communicatorserver.java
字号:
throw new IllegalStateException("Stop server before " + "carrying out this operation"); } final String error = "MBeanServer argument must be MBean server where this " + "server is registered, or an MBeanServerForwarder " + "leading to that server"; Vector seenMBS = new Vector(); for (MBeanServer mbs = newMBS; mbs != bottomMBS; mbs = ((MBeanServerForwarder) mbs).getMBeanServer()) { if (!(mbs instanceof MBeanServerForwarder)) throw new IllegalArgumentException(error); if (seenMBS.contains(mbs)) throw new IllegalArgumentException("MBeanServerForwarder " + "loop"); seenMBS.addElement(mbs); } topMBS = newMBS; } // // To be called by the subclass if needed // /** * For internal use only. */ ObjectName getObjectName() { return objectName ; } /** * For internal use only. */ void changeState(int newState) { int oldState; synchronized (stateLock) { if (state == newState) return; oldState = state; state = newState; stateLock.notifyAll(); } sendStateChangeNotification(oldState, newState); } /** * Returns the string used in debug traces. */ String makeDebugTag() { return "CommunicatorServer["+ getProtocol() + ":" + getPort() + "]" ; } /** * Returns the string used to name the connector thread. */ String makeThreadName() { String result ; if (objectName == null) result = "CommunicatorServer" ; else result = objectName.toString() ; return result ; } /** * This method blocks if there are too many active clients. * Call to <CODE>wait()</CODE> is terminated when a client handler * thread calls <CODE>notifyClientHandlerDeleted(this)</CODE> ; */ private synchronized void waitIfTooManyClients() throws InterruptedException { while (getActiveClientCount() >= maxActiveClientCount) { if (isTraceOn()) { trace("waitIfTooManyClients", "Waiting for a client to terminate") ; } wait(); } } /** * This method blocks until there is no more active client. */ private void waitClientTermination() { int s = clientHandlerVector.size() ; if (isTraceOn()) { if (s >= 1) { trace("waitClientTermination","waiting for " + s + " clients to terminate") ; } } // The ClientHandler will remove themselves from the // clientHandlerVector at the end of their run() method, by // calling notifyClientHandlerDeleted(). // Since the clientHandlerVector is modified by the ClientHandler // threads we must avoid using Enumeration or Iterator to loop // over this array. We must also take care of NoSuchElementException // which could be thrown if the last ClientHandler removes itself // between the call to clientHandlerVector.isEmpty() and the call // to clientHandlerVector.firstElement(). // What we *MUST NOT DO* is locking the clientHandlerVector, because // this would most probably cause a deadlock. // while (! clientHandlerVector.isEmpty()) { try { clientHandlerVector.firstElement().join(); } catch (NoSuchElementException x) { trace("waitClientTermination","No element left: " + x); } } if (isTraceOn()) { if (s >= 1) { trace("waitClientTermination","Ok, let's go...") ; } } } /** * Call <CODE>interrupt()</CODE> on each pending client. */ private void terminateAllClient() { final int s = clientHandlerVector.size() ; if (isTraceOn()) { if (s >= 1) { trace("terminateAllClient","Interrupting " + s + " clients") ; } } // The ClientHandler will remove themselves from the // clientHandlerVector at the end of their run() method, by // calling notifyClientHandlerDeleted(). // Since the clientHandlerVector is modified by the ClientHandler // threads we must avoid using Enumeration or Iterator to loop // over this array. // We cannot use the same logic here than in waitClientTermination() // because there is no guarantee that calling interrupt() on the // ClientHandler will actually terminate the ClientHandler. // Since we do not want to wait for the actual ClientHandler // termination, we cannot simply loop over the array until it is // empty (this might result in calling interrupt() endlessly on // the same client handler. So what we do is simply take a snapshot // copy of the vector and loop over the copy. // What we *MUST NOT DO* is locking the clientHandlerVector, because // this would most probably cause a deadlock. // final ClientHandler[] handlers = clientHandlerVector.toArray(new ClientHandler[0]); for (ClientHandler h : handlers) { try { h.interrupt() ; } catch (Exception x) { if (isTraceOn()) trace("terminateAllClient", "Failed to interrupt pending request: "+x+ " - skiping"); } } } /** * Controls the way the CommunicatorServer service is deserialized. */ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { // Call the default deserialization of the object. // stream.defaultReadObject(); // Call the specific initialization for the CommunicatorServer service. // This is for transient structures to be initialized to specific // default values. // stateLock = new Object(); state = OFFLINE; stopRequested = false; servedClientCount = 0; clientHandlerVector = new Vector<ClientHandler>(); fatherThread = Thread.currentThread(); mainThread = null; notifCount = 0; notifInfos = null; notifBroadcaster = new NotificationBroadcasterSupport(); dbgTag = makeDebugTag(); } // // NotificationBroadcaster // /** * Adds a listener for the notifications emitted by this * CommunicatorServer. * There is only one type of notifications sent by the CommunicatorServer: * they are <tt>{@link javax.management.AttributeChangeNotification}</tt>, * sent when the <tt>State</tt> attribute of this CommunicatorServer * changes. * * @param listener The listener object which will handle the emitted * notifications. * @param filter The filter object. If filter is null, no filtering * will be performed before handling notifications. * @param handback An object which will be sent back unchanged to the * listener when a notification is emitted. * * @exception IllegalArgumentException Listener parameter is null. */ public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws java.lang.IllegalArgumentException { if (isDebugOn()) { debug("addNotificationListener","Adding listener "+ listener + " with filter "+ filter + " and handback "+ handback); } notifBroadcaster.addNotificationListener(listener, filter, handback); } /** * Removes the specified listener from this CommunicatorServer. * Note that if the listener has been registered with different * handback objects or notification filters, all entries corresponding * to the listener will be removed. * * @param listener The listener object to be removed. * * @exception ListenerNotFoundException The listener is not registered. */ public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException { if (isDebugOn()) { debug("removeNotificationListener","Removing listener "+ listener); } notifBroadcaster.removeNotificationListener(listener); } /** * Returns an array of MBeanNotificationInfo objects describing * the notification types sent by this CommunicatorServer. * There is only one type of notifications sent by the CommunicatorServer: * it is <tt>{@link javax.management.AttributeChangeNotification}</tt>, * sent when the <tt>State</tt> attribute of this CommunicatorServer * changes. */ public MBeanNotificationInfo[] getNotificationInfo() { // Initialize notifInfos on first call to getNotificationInfo() // if (notifInfos == null) { notifInfos = new MBeanNotificationInfo[1]; String[] notifTypes = { AttributeChangeNotification.ATTRIBUTE_CHANGE}; notifInfos[0] = new MBeanNotificationInfo( notifTypes, AttributeChangeNotification.class.getName(), "Sent to notify that the value of the State attribute "+ "of this CommunicatorServer instance has changed."); } return notifInfos; } /** * */ private void sendStateChangeNotification(int oldState, int newState) { String oldStateString = getStringForState(oldState); String newStateString = getStringForState(newState); String message = new StringBuffer().append(dbgTag) .append(" The value of attribute State has changed from ") .append(oldState).append(" (").append(oldStateString) .append(") to ").append(newState).append(" (") .append(newStateString).append(").").toString(); notifCount++; AttributeChangeNotification notif = new AttributeChangeNotification(this, // source notifCount, // sequence number System.currentTimeMillis(), // time stamp message, // message "State", // attribute name "int", // attribute type new Integer(oldState), // old value new Integer(newState) ); // new value if (isDebugOn()) { debug("sendStateChangeNotification", "Sending AttributeChangeNotification #"+ notifCount + " with message: "+ message); } notifBroadcaster.sendNotification(notif); } /** * */ private static String getStringForState(int s) { switch (s) { case ONLINE: return "ONLINE"; case STARTING: return "STARTING"; case OFFLINE: return "OFFLINE"; case STOPPING: return "STOPPING"; default: return "UNDEFINED"; } } // // MBeanRegistration // /** * Preregister method of connector. * *@param server The <CODE>MBeanServer</CODE> in which the MBean will * be registered. *@param name The object name of the MBean. * *@return The name of the MBean registered. * *@exception java.langException This exception should be caught by * the <CODE>MBeanServer</CODE> and re-thrown * as an <CODE>MBeanRegistrationException</CODE>. */ public ObjectName preRegister(MBeanServer server, ObjectName name) throws java.lang.Exception { objectName = name; synchronized (this) { if (bottomMBS != null) { throw new IllegalArgumentException("connector already " + "registered in an MBean " + "server"); } topMBS = bottomMBS = server; } dbgTag = makeDebugTag(); return name; } /** * *@param registrationDone Indicates whether or not the MBean has been * successfully registered in the <CODE>MBeanServer</CODE>. * The value false means that the registration phase has failed. */ public void postRegister(Boolean registrationDone) { if (!registrationDone.booleanValue()) { synchronized (this) { topMBS = bottomMBS = null; } } } /** * Stop the connector. * * @exception java.langException This exception should be caught by * the <CODE>MBeanServer</CODE> and re-thrown * as an <CODE>MBeanRegistrationException</CODE>. */ public void preDeregister() throws java.lang.Exception { synchronized (this) { topMBS = bottomMBS = null; } objectName = null ; final int cstate = getState(); if ((cstate == ONLINE) || ( cstate == STARTING)) { stop() ; } } /** * Do nothing. */ public void postDeregister(){ } /** * Load a class using the default loader repository **/ Class loadClass(String className) throws ClassNotFoundException { try { return Class.forName(className); } catch (ClassNotFoundException e) { final ClassLoaderRepository clr = MBeanServerFactory.getClassLoaderRepository(bottomMBS); if (clr == null) throw new ClassNotFoundException(className); return clr.loadClass(className); } } // // Debug stuff // /** */ int infoType; /** */ boolean isTraceOn() { return Trace.isSelected(Trace.LEVEL_TRACE, infoType); } /** */ void trace(String clz, String func, String info) { Trace.send(Trace.LEVEL_TRACE, infoType, clz, func, info); } /** */ boolean isDebugOn() { return Trace.isSelected(Trace.LEVEL_DEBUG, infoType); } /** */ void debug(String clz, String func, String info) { Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, info); } /** */ void debug(String clz, String func, Throwable exception) { Trace.send(Trace.LEVEL_DEBUG, infoType, clz, func, exception); } /** */ void trace(String func, String info) { trace(dbgTag, func, info); } /** */ void debug(String func, String info) { debug(dbgTag, func, info); } /** */ void debug(String func, Throwable exception) { debug(dbgTag, func, exception); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -