⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 multiplexedconnection.java

📁 这是linux下ssl vpn的实现程序
💻 JAVA
字号:
/*
 * Created on 08-May-2005
 *
 * TODO To change the template for this generated file go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
package com.maverick.multiplex;

import java.io.*;
import java.util.Vector;
import java.util.Enumeration;

/**
 * @author lee
 * 
 * This class allows multiple channels to operate over a single stream.
 */
public class MultiplexedConnection {

    Channel[] channels;
    int connectionLimit;
    int inactivityTimeout;
    DataInputStream in;
    DataOutputStream out;
    boolean running = false;
    ChannelFactory factory;
    int totalChannels;
    Thread thread;
    Vector activeChannels = new Vector();
    Vector listeners = new Vector();

    /* DEBUG */static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
    /* DEBUG */                .getLog(MultiplexedConnection.class);

    public static final int MSG_CHANNEL_OPEN = 1;
    public static final int MSG_CHANNEL_OPEN_CONFIRMATION = 2;
    public static final int MSG_CHANNEL_OPEN_FAILURE = 3;
    public static final int MSG_CHANNEL_DATA = 4;
    public static final int MSG_CHANNEL_WINDOW_ADJUST = 5;
    public static final int MSG_CHANNEL_CLOSE = 6;
    public static final int MSG_DISCONNECT = 7;

    MessageObserver channelOpenMessages = new MessageObserver() {
        public boolean wantsNotification(Message msg) {
            switch (msg.getMessageId()) {
                case MSG_CHANNEL_OPEN_CONFIRMATION:
                case MSG_CHANNEL_OPEN_FAILURE:
                    return true;
                default:
                    return false;
            }
        }
    };

    public MultiplexedConnection(int maxChannels, int connectionLimit, int inactivityTimeout, InputStream in, OutputStream out,
                    ChannelFactory factory) {
        channels = new Channel[maxChannels > 0 ? maxChannels : 100];
        this.connectionLimit = connectionLimit;
        this.inactivityTimeout = inactivityTimeout;
        this.in = new DataInputStream(in);
        this.out = new DataOutputStream(out);
        this.factory = factory;
        this.thread = new Thread(new Runner());
        thread.start();
    }

    public int getTotalChannelCount() {
        return totalChannels;
    }

    public int getActiveChannelCount() {
        return activeChannels.size();
    }

    public synchronized Channel[] getActiveChannels() {
        Channel[] tmp = new Channel[activeChannels.size()];
        activeChannels.copyInto(tmp);
        return tmp;
    }

    public void addListener(MultiplexedConnectionListener listener) {
        if (listener != null)
            listeners.addElement(listener);
    }

    public void stop() {
        /* DEBUG */log.info("Shutting down multiplexed connection thread");
        running = false;
        if (!Thread.currentThread().equals(thread))
            thread.interrupt();
    }

    protected void onChannelOpen(Message msg) throws IOException {

        String type = msg.readString();
        int remoteid = (int) msg.readInt();
        int remotepacket = (int) msg.readInt();
        int remotewindow = (int) msg.readInt();

        byte[] data = new byte[msg.available()];
        msg.read(data);

        if (factory == null) {
            sendChannelOpenFailure(remoteid, "This connection does not support the opening of channels");
            return;
        }

        Channel channel = factory.createChannel(type);

        if (channel == null) {
            sendChannelOpenFailure(remoteid, "Failed to create channel of type " + type);
            return;
        }

        channel.init(this, remoteid, remotepacket, remotewindow);

        if (allocateChannel(channel) == -1) {
            sendChannelOpenFailure(remoteid, "Too many channels already open");

        } else {
            data = channel.open(data);
            sendChannelOpenConfirmation(channel, data);
        }

        channel.fireChannelOpen();

    }

    protected void onChannelMessage(Message msg) throws IOException {

        int channelid = (int) msg.readInt();
        Channel channel = channels[channelid];
        if (channel != null) {
            channel.messageStore.addMessage(msg);
        } else {
            /* DEBUG */log.warn("Message received for non-existent channel id " + channelid);

        }
    }

    protected void onDisconnect(Message msg) throws IOException {
        int reason = (int) msg.readInt();
        String desc = msg.readString();
        /* DEBUG */log.info("Remote disconnected:" + desc);
        stop();
    }

    private synchronized int allocateChannel(Channel channel) {
        for (int i = 0; i < channels.length; i++) {
            if (channels[i] == null) {
                channels[i] = channel;
                activeChannels.addElement(channel);
                totalChannels++;
                channel.channelid = i;
                return i;
            }
        }
        return -1;
    }

    synchronized void freeChannel(Channel channel) {
        channels[channel.channelid] = null;
        activeChannels.removeElement(channel);
    }

    public synchronized void openChannel(Channel channel) throws IOException {

        byte[] data = channel.create();

        if (allocateChannel(channel) == -1) {
            throw new IOException("Failed to allocate channel: too many active channels");
        }

        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_OPEN);
        msg.writeString(channel.getType());
        msg.writeInt(channel.channelid);
        msg.writeInt(channel.getLocalPacket());
        msg.writeInt(channel.getLocalWindow());
        if (data != null)
            msg.write(data);

        sendMessage(msg);

        Message reply = channel.messageStore.nextMessage(channelOpenMessages);

        switch (reply.getMessageId()) {
            case MSG_CHANNEL_OPEN_CONFIRMATION:
                int remoteid = (int) reply.readInt();
                int remotepacket = (int) reply.readInt();
                int remotewindow = (int) reply.readInt();
                channel.init(this, remoteid, remotepacket, remotewindow);
                channel.fireChannelOpen();
                break;
            case MSG_CHANNEL_OPEN_FAILURE:
                String desc = reply.readString();
                throw new IOException(desc);
            default:
                throw new IOException("Unexpected reply in channel open procedure");
        }

    }

    public void sendChannelData(Channel channel, byte[] data, int off, int len) throws IOException {
        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_DATA);
        msg.writeInt(channel.remoteid);
        msg.writeBinaryString(data, off, len);

        sendMessage(msg);
    }

    private void sendChannelOpenFailure(int channelid, String desc) throws IOException {
        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_OPEN_CONFIRMATION);
        msg.writeInt(channelid);
        msg.writeString(desc);

        sendMessage(msg);
    }

    private void sendChannelOpenConfirmation(Channel channel, byte[] data) throws IOException {
        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_OPEN_CONFIRMATION);
        msg.writeInt(channel.remoteid);
        msg.writeInt(channel.channelid);
        msg.writeInt(channel.getLocalPacket());
        msg.writeInt(channel.getLocalWindow());
        if (data != null)
            msg.write(data);

        sendMessage(msg);
    }

    public void sendWindowAdjust(Channel channel, int increment) throws IOException {
        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_WINDOW_ADJUST);
        msg.writeInt(channel.remoteid);
        msg.writeInt(increment);

        sendMessage(msg);
    }

    public void closeChannel(Channel channel) throws IOException {
        Packet msg = new Packet();
        msg.write(MSG_CHANNEL_CLOSE);
        msg.writeInt(channel.remoteid);

        sendMessage(msg);
    }

    public void closeAllChannels() {

        for (int i = 0; i < channels.length; i++) {
            if (channels[i] != null) {
                try {
                    channels[i].close();
                } catch (Throwable t) {
                }
            }
        }
    }

    public void disconnect(int reason, String desc) {

        /* DEBUG */log.info("Disconnecting multiplexed connection");

        closeAllChannels();

        try {
            Packet msg = new Packet();
            msg.write(MSG_DISCONNECT);
            msg.writeInt(reason);
            msg.writeString(desc);

            sendMessage(msg);

        } catch (IOException ex) {
            /* DEBUG */log.info("Error on disconnect", ex);
        } finally {
            try {
                in.close();
            } catch (Throwable t) {
            }
            try {
                out.close();
            } catch (Throwable t) {
            }
        }
    }

    protected synchronized void sendMessage(Packet msg) throws IOException {

        msg.prepare();

        // log.info("Writing " + msg.size() + " bytes of socket data");
        out.write(msg.array(), 0, msg.size());
        out.flush();
    }

    class Runner implements Runnable {

        /**
         * Perform the multiplexed connection protocol
         */
        public void run() {

            running = true;

            while (running) {

                try {

                    int msglength = in.readInt();

                    if (msglength <= 0) {
                        /* DEBUG */log.error("Invalid message length of " + msglength + " bytes");
                        stop();
                    } else {

                        byte[] tmp = new byte[msglength];
                        in.readFully(tmp);

                        Message msg = new Message(tmp);

                        switch (msg.getMessageId()) {
                            case MSG_CHANNEL_OPEN:
                                onChannelOpen(msg);
                                break;
                            case MSG_CHANNEL_OPEN_CONFIRMATION:
                                onChannelMessage(msg);
                                break;
                            case MSG_CHANNEL_OPEN_FAILURE:
                                onChannelMessage(msg);
                                break;
                            case MSG_CHANNEL_DATA:
                                onChannelMessage(msg);
                                break;
                            case MSG_CHANNEL_WINDOW_ADJUST:
                                onChannelMessage(msg);
                                break;
                            case MSG_CHANNEL_CLOSE:
                                onChannelMessage(msg);
                                break;
                            case MSG_DISCONNECT:
                                onDisconnect(msg);
                                break;
                            default:
                                throw new IOException("Unexpected message id " + msg.getMessageId());

                        }

                    }

                } catch (IOException ex) {
                    /* DEBUG */log.error("Multiplexed connection thread failed", ex);
                    stop();
                }

            }

            for (Enumeration it = listeners.elements(); it.hasMoreElements();) {
                ((MultiplexedConnectionListener) it.nextElement()).onConnectionClose();
            }
        }
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -