⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexer.java

📁 一个java方面的消息订阅发送的源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        handshake(_out, _in);
    }

    /**
     * Perform handshaking on initial connection, to verify protocol. Subclasses
     * may extend this behaviour.
     *
     * @param out the endpoint's output stream
     * @param in  the endpoint's input stream
     * @throws IOException for any I/O error
     */
    protected void handshake(DataOutputStream out, DataInputStream in)
            throws IOException {
        out.writeInt(MAGIC);
        out.writeInt(VERSION);
        out.flush();

        int magic = in.readInt();
        if (magic != MAGIC) {
            throw new ProtocolException("Expected protocol magic=" + MAGIC
                                        + ", but received=" + magic);
        }
        int version = in.readInt();
        if (version != VERSION) {
            throw new ProtocolException("Expected protocol version=" + VERSION
                                        + ", but received=" + version);
        }
    }

    /**
     * Perform authentication on initial connection.
     *
     * @param principal the security principal. May be <code>null</code>
     * @throws IOException for any I/O error
     * @throws SecurityException if connection is refused by the server
     */
    protected void authenticate(Principal principal)
            throws IOException, SecurityException {
        try {
            if (principal != null && !(principal instanceof BasicPrincipal)) {
                throw new IOException(
                        "Cannot authenticate with principal of type "
                        + principal.getClass().getName());
            }
            if (principal != null) {
                BasicPrincipal basic = (BasicPrincipal) principal;
                _out.writeByte(AUTH_BASIC);
                _out.writeUTF(basic.getName());
                _out.writeUTF(basic.getPassword());
            } else {
                _out.writeByte(AUTH_NONE);
            }
            _out.flush();
            if (_in.readByte() != AUTH_OK) {
                throw new SecurityException("Connection refused");
            }
        } catch (IOException exception) {
            // terminate the connection
            _endpoint.close();
            throw exception;
        }
        _principal = principal;
    }

    /**
     * Performs authentication on initial connection.
     *
     * @param authenticator      the authenticator
     * @throws IOException       for any I/O error
     * @throws ResourceException if the authenticator cannot authenticate
     */
    protected void authenticate(Authenticator authenticator)
            throws IOException, ResourceException {

        try {
            Principal principal = null;
            byte type = _in.readByte();

            switch (type) {
                case AUTH_BASIC:
                    String name = _in.readUTF();
                    String password = _in.readUTF();
                    principal = new BasicPrincipal(name, password);
                    break;
                case AUTH_NONE:
                    break;
                default:
                    throw new IOException("Invalid packet type: " + type);
            }
            if (authenticator.authenticate(principal)) {
                _out.writeByte(AUTH_OK);
                _out.flush();
            } else {
                _out.writeByte(AUTH_DENIED);
                _out.flush();
                throw new SecurityException("User " + principal
                                             + " unauthorised");
            }
            _principal = principal;
        } catch (IOException exception) {
            // terminate the connection
            _endpoint.close();
            throw exception;
        } catch (ResourceException exception) {
            // terminate the connection
            _endpoint.close();
            throw exception;
        }
    }

    /**
     * Opens a new channel.
     *
     * @return a new channel
     * @throws IOException if a channel can't be opened
     */
    protected Channel open() throws IOException {
        Channel channel;
        int channelId;
        synchronized (_channels) {
            channelId = getNextChannelId();
            channel = addChannel(channelId);
        }

        send(OPEN, channelId);
        return channel;
    }

    /**
     * Read a packet from the endpoint.
     */
    private void multiplex() {
        try {
            byte type = _in.readByte();
            switch (type) {
                case OPEN:
                    handleOpen();
                    break;
                case CLOSE:
                    handleClose();
                    break;
                case REQUEST:
                    handleRequest();
                    break;
                case RESPONSE:
                    handleResponse();
                    break;
                case DATA:
                    handleData();
                    break;
                case PING_REQUEST:
                    handlePingRequest();
                    break;
                case PING_RESPONSE:
                    handlePingResponse();
                    break;
                case FLOW_READ:
                    handleFlowRead();
                    break;
                case SHUTDOWN:
                    handleShutdown();
                    break;
                default:
                    throw new IOException("Unrecognised message type: "
                                          + type);
            }
        } catch (Exception exception) {
            boolean closed = _closed;
            shutdown();
            if (!closed) {
                _log.debug("Multiplexer shutting down on error", exception);
                // error notify the listener
                _listener.error(exception);
            }
        }
    }

    /**
     * Shuts down the multiplexer.
     */
    private void shutdown() {
        // mark this as closed
        _closed = true;

        // notify the channels
        Channel[] channels;
        synchronized (_channels) {
            channels = (Channel[]) _channels.values().toArray(new Channel[0]);
        }
        for (int i = 0; i < channels.length; ++i) {
            channels[i].disconnected();
        }
    }

    /**
     * Open a new channel.
     *
     * @throws IOException for any error
     */
    private void handleOpen() throws IOException {
        int channelId = _in.readUnsignedShort();
        Integer key = new Integer(channelId);

        synchronized (_channels) {
            if (_channels.get(key) != null) {
                throw new IOException(
                        "A channel already exists with identifier: " + key);
            }
            addChannel(channelId);
        }
    }

    /**
     * Close a channel.
     *
     * @throws IOException for any error
     */
    private void handleClose() throws IOException {
        int channelId = _in.readUnsignedShort();
        Integer key = new Integer(channelId);

        synchronized (_channels) {
            Channel channel = (Channel) _channels.remove(key);
            if (channel == null) {
                throw new IOException(
                        "No channel exists with identifier: " + key);
            }
            channel.close();
        }
    }

    /**
     * Handle a <code>REQUEST</code> packet.
     *
     * @throws IOException if an I/O error occurs, or no channel exists matching
     *                     that read from the packet
     */
    private void handleRequest() throws IOException {
        final Channel channel = handleData();
        Runnable request = new Runnable() {
            public void run() {
                if (_log.isDebugEnabled()) {
                    _log.debug("handleRequest() [channel="
                               + channel.getId() + "]");
                }
                // todo - need to handle closed()
                _listener.request(channel);

                if (_log.isDebugEnabled()) {
                    _log.debug("handleRequest() [channel="
                               + channel.getId() + "] - end");
                }
            }
        };
        try {
            _pool.execute(request);
        } catch (InterruptedException exception) {
            throw new InterruptedIOException(exception.getMessage());
        }
    }

    /**
     * Handle a <code>RESPONSE</code> packet.
     *
     * @throws IOException if an I/O error occurs, or no channel exists matching
     *                     that read from the packet
     */
    private void handleResponse() throws IOException {
        handleData();
    }

    /**
     * Handle a <code>PING_REQUEST</code> packet.
     *
     * @throws IOException if an I/O error occurs
     */
    private void handlePingRequest() throws IOException {
        Channel channel = readChannel();
        channel.handlePingRequest();
    }

    /**
     * Handle a <code>PING_RESPONSE</code> packet.
     *
     * @throws IOException if an I/O error occurs
     */
    private void handlePingResponse() throws IOException {
        Channel channel = readChannel();
        channel.handlePingResponse();
    }

    /**
     * Handle a <code>DATA</code> packet.
     *
     * @return the channel to handle the packet
     * @throws IOException if an I/O error occurs, or no channel exists matching
     *                     that read from the packet
     */
    private Channel handleData() throws IOException {
        Channel channel = readChannel();
        int length = _in.readInt();
        channel.getMultiplexInputStream().receive(_in, length);
        return channel;
    }

    /**
     * Handle a <code>FLOW_READ</code> packet.
     *
     * @throws IOException if an I/O error occurs
     */
    private void handleFlowRead() throws IOException {
        Channel channel = readChannel();
        int read = _in.readInt();
        channel.getMultiplexOutputStream().notifyRead(read);
    }

    /**
     * Handle a <code>SHUTDOWN</code> packet.
     */
    private void handleShutdown() {
        shutdown();
        _listener.closed();
    }

    /**
     * Adds a new channel.
     * <p/>
     * NOTE: Must be invoked with <code>_channels</code> synchronized
     *
     * @param channelId the channel identifier
     * @return the new channel
     */
    private Channel addChannel(int channelId) {
        int size = BUFFER_SIZE;
        MultiplexOutputStream out =
                new MultiplexOutputStream(channelId, this, size, size);
        MultiplexInputStream in =
                new MultiplexInputStream(channelId, this, size);
        Channel channel = new Channel(channelId, this, in, out);
        _channels.put(new Integer(channelId), channel);
        return channel;
    }

    /**
     * Reads the channel identifier from the stream and returns the
     * corresponding channel.
     *
     * @return the channel corresponding to the read channel identifier
     * @throws IOException for any I/O error, or if there is no corresponding
     *                     channel
     */
    private Channel readChannel() throws IOException {
        int channelId = _in.readUnsignedShort();
        return getChannel(channelId);
    }

    /**
     * Returns a channel given its identifier.
     *
     * @param channelId the channel identifier
     * @return the channel corresponding to <code>channelId</code>
     * @throws IOException if there is no corresponding channel
     */
    private Channel getChannel(int channelId) throws IOException {
        Channel channel;
        Integer key = new Integer(channelId);
        synchronized (_channels) {
            channel = (Channel) _channels.get(key);
            if (channel == null) {
                throw new IOException(
                        "No channel exists with identifier: " + channelId);
            }
        }
        return channel;
    }

    /**
     * Returns the next available channel identifier. Channel identifiers
     * generated on the client side are in the range 0x0..0x7FFF, on the server
     * side, 0x8000-0xFFFF
     * <p/>
     * NOTE: Must be invoked with <code>_channels</code> synchronized
     *
     * @return the next channel identifier
     * @throws IOException if the connection is closed
     */
    private int getNextChannelId() throws IOException {
        final int mask = 0x7fff;
        final int serverIdBase = 0x8000;
        int channelId = 0;
        while (!_closed) {
            _seed = (_seed + 1) & mask;
            channelId = (_client) ? _seed : _seed + serverIdBase;
            if (!_channels.containsKey(new Integer(channelId))) {
                break;
            }
        }
        if (_closed) {
            throw new IOException("Connection has been closed");
        }
        return channelId;
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -