⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionmultiplexersocketreader.java

📁 基于Jabber协议的即时消息服务器
💻 JAVA
字号:
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */package org.jivesoftware.wildfire.net;import org.dom4j.Element;import org.jivesoftware.util.JiveGlobals;import org.jivesoftware.util.Log;import org.jivesoftware.wildfire.PacketRouter;import org.jivesoftware.wildfire.RoutingTable;import org.jivesoftware.wildfire.Session;import org.jivesoftware.wildfire.SessionManager;import org.jivesoftware.wildfire.auth.UnauthorizedException;import org.jivesoftware.wildfire.multiplex.ConnectionMultiplexerSession;import org.jivesoftware.wildfire.multiplex.MultiplexerPacketHandler;import org.jivesoftware.wildfire.multiplex.Route;import org.xmlpull.v1.XmlPullParserException;import org.xmpp.packet.IQ;import org.xmpp.packet.Message;import org.xmpp.packet.PacketError;import org.xmpp.packet.Presence;import java.io.IOException;import java.net.Socket;import java.util.concurrent.*;/** * A SocketReader specialized for connection manager connections. Connection managers may have * one or more connections to the server. Each connection will have its own instance of this * class. Each connection will send packets, sent from clients connected to the connection * manager, to the server. Moreover, the server will use any of the available connections * to the connection manager to send packets to connected clients through the connection manager.<p> * * Each socket reader has its own thread pool to process many packets in parallel. The thread pool * by default will use 10 core threads, a queue of 50 elements and a max number of 100 threads. * The pool will use the 10 core threads in parallel and queue packets. When the queue is full * then more threads will be created until the max number is reached. Any created thread that * exceeds the core number of threads will be killed when idle for 1 minute. The thread pool * configuration can be modified by setting the system properties: * <ul> *  <li>xmpp.multiplex.processing.core.threads *  <li>xmpp.multiplex.processing.max.threads *  <li>xmpp.multiplex.processing.queue * </ul> * * Each Connection Manager has its own domain. Each connection from the same connection manager * uses a different resource. Unlike any other session, connection manager sessions are not * present in the routing table. This means that connection managers are not reachable entities. * In other words, entities cannot send packets to connection managers but clients being hosted * by them. The main reason behind this design decision is that connection managers are private * components of the server so they can only be contacted by the server. Connection Manager * sessions are present in {@link SessionManager} but not in {@link RoutingTable}. * * @author Gaston Dombiak */public class ConnectionMultiplexerSocketReader extends SocketReader {    /**     * Pool of threads that are available for processing the requests.     */    private ThreadPoolExecutor threadPool;    /**     * Queue used when thread pool is exhausted (i.e. core threads, queue and max threads are all busy). Once     * the thread pool is exhausted, incoming packets will be placed into this queue. Once the queue is emptied     * incoming packets will go to the thread pool.<p>     *      * Note that the queue is unbound so we may potentially consume all Java memory. A future version may make     * Connection Managers smarter and throttle traffic to the server to avoid this problem.     */    private BlockingQueue<Runnable> overflowBuffer = new LinkedBlockingQueue<Runnable>();    /**     * Handler of IQ packets sent from the Connection Manager to the server.     */    private MultiplexerPacketHandler packetHandler;    public ConnectionMultiplexerSocketReader(PacketRouter router, RoutingTable routingTable,            String serverName, Socket socket, SocketConnection connection,            boolean useBlockingMode) {        super(router, routingTable, serverName, socket, connection, useBlockingMode);        // Create a pool of threads that will process received packets. If more threads are        // required then the command will be executed on the SocketReader process        int coreThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.core.threads", 10);        int maxThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.max.threads", 100);        int queueSize = JiveGlobals.getIntProperty("xmpp.multiplex.processing.queue", 50);        threadPool =                new ThreadPoolExecutor(coreThreads, maxThreads, 60, TimeUnit.SECONDS,                        new LinkedBlockingQueue<Runnable>(queueSize),                        new RejectedExecutionHandler() {                            /**                             * Stores rejected tasks in the overflow queue.                             * @param r the rejected task.                             * @param executor thread pool executor.                             */                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                if (!executor.isShutdown()) {                                    overflowBuffer.add(r);                                }                            }                        });        // Thread that will consume packets present in the overflow queue. The thread will monitor the threadPool        // and when a thread in the pool is available it will remove the oldest packet in the overflow queue and        // process it with the idle threads.        Thread overflowThread = new Thread() {            public void run() {                while (!threadPool.isShutdown() && !threadPool.isTerminated()) {                    try {                        // Get the next task that has been rejected when the threadPool was exhausted                        Runnable runnable = overflowBuffer.take();                        // Wait until the pool has available threads                        while (threadPool.getActiveCount() >= threadPool.getMaximumPoolSize()) {                            Thread.sleep(100);                        }                        // Process the rejected task                        threadPool.execute(runnable);                    } catch (InterruptedException e) {                        // Do nothing                    }                    catch(Exception e) {                        Log.error("Error consuming overflow buffer", e);                    }                }            }        };        overflowThread.start();    }    boolean createSession(String namespace)            throws UnauthorizedException, XmlPullParserException, IOException {        if (getNamespace().equals(namespace)) {            // The connected client is a connection manager so create a ConnectionMultiplexerSession            session = ConnectionMultiplexerSession.createSession(serverName, reader, connection);            packetHandler = new MultiplexerPacketHandler(session.getAddress().getDomain());            return true;        }        return false;    }    String getNamespace() {        return "jabber:connectionmanager";    }    protected void processIQ(final IQ packet) throws UnauthorizedException {        if (session.getStatus() != Session.STATUS_AUTHENTICATED) {            // Session is not authenticated so return error            IQ reply = new IQ();            reply.setChildElement(packet.getChildElement().createCopy());            reply.setID(packet.getID());            reply.setTo(packet.getFrom());            reply.setFrom(packet.getTo());            reply.setError(PacketError.Condition.not_authorized);            session.process(reply);            return;        }        // Process the packet in another thread        threadPool.execute(new Runnable() {            public void run() {                packetHandler.handle(packet);            }        });    }    /**     * Process stanza sent by a client that is connected to a connection manager. The     * original stanza is wrapped in the route element. Only a single stanza must be     * wrapped in the route element.     *     * @param packet the route element.     */    private void processRoute(final Route packet) throws UnauthorizedException {        if (session.getStatus() != Session.STATUS_AUTHENTICATED) {            // Session is not authenticated so return error            Route reply = new Route(packet.getStreamID());            reply.setID(packet.getID());            reply.setTo(packet.getFrom());            reply.setFrom(packet.getTo());            reply.setError(PacketError.Condition.not_authorized);            session.process(reply);            return;        }        // Process the packet in another thread        Runnable runnable = new Runnable() {            public void run() {                packetHandler.route(packet);            }        };        if (!overflowBuffer.isEmpty()) {            // Thread pool is exhausted or we are still recoving from a recent exhausted state.            // Keep placing tasks in this queue until the queue is empty. The queue will help us            // keep the cronological order of incoming packets. Note that if we don't care about            // being cronologically correct then we should just add the task to the threadPool.            overflowBuffer.add(runnable);        }        else {            // Thread pool is not exhausted and we are not recovering from an exhausted state so just            // run the task using the thread pool            threadPool.execute(runnable);        }    }    protected void processMessage(final Message packet) throws UnauthorizedException {        throw new UnauthorizedException("Message packets are not supported. Original packets " +                "should be wrapped by IQ packets.");    }    protected void processPresence(final Presence packet) throws UnauthorizedException {        throw new UnauthorizedException("Message packets are not supported. Original packets " +                "should be wrapped by IQ packets.");    }    boolean processUnknowPacket(Element doc) {        String tag = doc.getName();        if ("route".equals(tag)) {            // Process stanza wrapped by the route packet            try {                processRoute(new Route(doc));                return true;            }            catch (UnauthorizedException e) {                // Should never happen            }        }        else if ("handshake".equals(tag)) {            open = ((ConnectionMultiplexerSession)session).authenticate(doc.getStringValue());            return true;        }        else if ("error".equals(tag) && "stream".equals(doc.getNamespacePrefix())) {            session.getConnection().close();            open = false;            return true;        }        return false;    }    protected void shutdown() {        super.shutdown();        // Shutdown the pool of threads that are processing packets sent by        // the remote server        threadPool.shutdown();    }    String getName() {        return "ConnectionMultiplexer SR - " + hashCode();    }    boolean validateHost() {        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -