⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.java

📁 这是linux下ssl vpn的实现程序
💻 JAVA
字号:
/*
 * Created on 08-May-2005
 *
 * TODO To change the template for this generated file go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
package com.maverick.multiplex;

import java.io.*;
/* DEBUG */import org.apache.commons.logging.*;
import java.util.Vector;
import java.util.Enumeration;
/**
 *
 * @author lee
 *
 * TODO To change the template for this generated type comment go to
 * Window - Preferences - Java - Code Style - Code Templates
 */
public abstract class Channel {

    MultiplexedConnection connection;
	int channelid;
	int remoteid;
	String type;

	DataWindow remotewindow;
	DataWindow localwindow;
        Vector listeners = new Vector();
	ChannelInputStream in;
	ChannelOutputStream out;
        int windowSequence = 0;
        /* DEBUG */Log log = LogFactory.getLog(Channel.class);

	MessageObserver stickyMessages = new MessageObserver() {
		public boolean wantsNotification(Message msg) {
			switch(msg.getMessageId()) {
				case MultiplexedConnection.MSG_CHANNEL_CLOSE:
					return true;
				default:
					return false;
			}
		}
	};

	  final MessageObserver WINDOW_ADJUST_MESSAGES = new MessageObserver() {
	    public boolean wantsNotification(Message msg) {
	        switch(msg.getMessageId()) {
	            case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
	            case MultiplexedConnection.MSG_CHANNEL_CLOSE:
	                return true;
	            default:
	                return false;
	        }
	    }
	  };

	  final MessageObserver CHANNEL_CLOSE_MESSAGES = new MessageObserver() {
	    public boolean wantsNotification(Message msg) {
	        switch(msg.getMessageId()) {
	            case MultiplexedConnection.MSG_CHANNEL_CLOSE:
	                return true;
	            default:
	                return false;
	        }
	    }
	  };

	  final MessageObserver CHANNEL_DATA_MESSAGES = new MessageObserver() {
	    public boolean wantsNotification(Message msg) {

	        // Access to this observer is synchronized by the ThreadSynchronizer
	        // so we can flag our InputStream as blocking when the method is
	        // called and released once we have found a message
	        switch(msg.getMessageId()) {
	            case MultiplexedConnection.MSG_CHANNEL_DATA:
	            case MultiplexedConnection.MSG_CHANNEL_CLOSE:
	                return true;
	            default:
	                return false;
	        }
	    }
	  };

	MessageStore messageStore = new MessageStore(this, stickyMessages);

	public Channel(String type, int localpacket, int localwindow) {
		this.type = type;
		this.localwindow = new DataWindow(localpacket, localwindow);

	    in = new ChannelInputStream(CHANNEL_DATA_MESSAGES);
	    out = new ChannelOutputStream();
	}

	public void init(MultiplexedConnection connection, int remoteid, int remotepacket, int remotewindow) {
		this.connection = connection;
		this.remoteid = remoteid;
		this.remotewindow = new DataWindow(remotewindow, remotepacket);
	}

	boolean closing = false;

        public void close() {

        boolean performClose = false;;

        synchronized(this) {
            if(!closing)
                performClose = closing = true;
        }

        if (performClose) {


            try {
                // Close the ChannelOutputStream
                out.close();

                // Send our close message
                connection.closeChannel(this);

                synchronized(messageStore) {
                    if (!messageStore.isClosed()) {
                        // Wait for the other side to close the channel
                            processMessages(CHANNEL_CLOSE_MESSAGES);
                    }
                }
            } catch (EOFException eof) {
                // Ignore this is the message store informing of close/eof
            } catch (IOException ex) {
                // IO Error during close so the connection has dropped
                connection.disconnect(1,
                          "IOException during channel close: " +
                                                ex.getMessage());
            } finally {
                if(connection!=null)
                    connection.freeChannel(this);
                onChannelClose();
                for(Enumeration e = listeners.elements(); e.hasMoreElements();) {
                    ((ChannelListener)e.nextElement()).onChannelClose(this);
                }
            }

        }

}

	public abstract byte[] open(byte[] data) throws IOException;

	public abstract byte[] create() throws IOException;

        void fireChannelOpen() {

            onChannelOpen();

            for (Enumeration e = listeners.elements(); e.hasMoreElements(); ) {
                ((ChannelListener) e.nextElement()).onChannelOpen(this);
            }

        }

	public abstract void onChannelOpen();

	public abstract void onChannelClose();

        public void addListener(ChannelListener listener) {
            if(listener!=null)
                listeners.addElement(listener);
        }

	public OutputStream getOutputStream() {
		return out;
	}

	public InputStream getInputStream() {
		return in;
	}

	public String getType() {
		return type;
	}

	public int getLocalWindow() {
		return localwindow.available();
	}

	public int getLocalPacket() {
		return localwindow.getPacketSize();
	}

	public boolean isClosed() {
		return messageStore.isClosed();
	}

	protected void adjustWindow(int increment) throws IOException {

            /* DEBUG *///log.info("Adjusting local window with " + increment + " bytes");
            localwindow.adjust(increment);
            connection.sendWindowAdjust(this, increment);
	}

	 Message processMessages(MessageObserver messagefilter)
     throws IOException,
     EOFException {

	 	Message msg;

		/**
		* Collect the next channel message from the connection protocol
		*/
	 	msg = messageStore.nextMessage(messagefilter);

	 	switch (msg.getMessageId()) {

		 case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
                       int i = (int)msg.readInt();
		       remotewindow.adjust(i);
                       /* DEBUG *///log.info("ADJUST " + i + " window=" + remotewindow.available() + " seqence=" + windowSequence);
                       windowSequence++;
		   break;

		 case MultiplexedConnection.MSG_CHANNEL_DATA:
		   msg.skip(4); // Skip the length
		   in.write(msg.array(), msg.getPosition(), msg.available());
		   break;


		 case MultiplexedConnection.MSG_CHANNEL_CLOSE:
		   messageStore.close();
		   close();
		   throw new EOFException("The channel is closed");

		 default:
		   break;
		}

return msg;
}

    private void jbInit() throws Exception {
    }


    class ChannelOutputStream
     extends OutputStream {

   boolean isEOF = false;

   public void write(int b) throws java.io.IOException {
     write(new byte[] { (byte) b}
           , 0, 1);
   }

   public synchronized void write(byte[] buf, int offset, int len) throws IOException {

       int write;

       //log.info("Write " + len + " bytes");

       do {


         if(remotewindow.available() <= 0) {
             /* DEBUG *///log.info("OutputStream::need window space");
             Message msg = processMessages(WINDOW_ADJUST_MESSAGES);
             /* DEBUG *///log.info("OutputStream::got window space window=" + remotewindow.available());
         }

         if(isClosed()) {
           throw new IOException("The channel is closed!");
         }

         write = remotewindow.available() < remotewindow.getPacketSize()
            ? (remotewindow.available() < len ? remotewindow.available() : len)
            :
            (remotewindow.getPacketSize() < len ? remotewindow.getPacketSize() :
             len);

         if(write > 0) {
//           log.info("Sending " + write + " bytes window=" + remotewindow.available());
           connection.sendChannelData(Channel.this, buf, offset, write);


           remotewindow.consume(write);
           //log.info("CONSUME " + write + " window=" + remotewindow.available());
           len -= write;
           offset += write;
         }

       }
       while(len > 0);

   }

   public void close() throws IOException {
     Channel.this.close();
   }

 }




	class ChannelInputStream
      extends InputStream {

    byte[] buffer;
    int unread = 0;
    int position = 0;
    int base = 0;
    MessageObserver messagefilter;
    long transfered = 0;

    ChannelInputStream(MessageObserver messagefilter) {
      buffer = new byte[localwindow.available()];
      this.messagefilter = messagefilter;
    }

    public synchronized int available() throws IOException {

      try {
          if (unread == 0) {
              if (messageStore.hasMessage(messagefilter) != null) {
                  processMessages(messagefilter);
              }
          }
          return unread;
           } catch (EOFException ex) {
          return -1;
           }
    }
    
    public void close() {
        Channel.this.close();
    }

    public int read() throws IOException {
      byte[] b = new byte[1];
      int ret = read(b, 0, 1);
      if (ret > 0) {
        return b[0] & 0xFF;
      }
      else {
        return -1;
      }
    }

    public long skip(long len) throws IOException {

          int count = unread < len ? unread : (int)len;

          try {
              if (count == 0 && isClosed())
                  throw new EOFException("The inputstream is closed");

              int index = base;
              base = (base + count) % buffer.length;
              unread -= count;

              if ((unread + localwindow.available()) < (buffer.length / 2)) {
                      adjustWindow(buffer.length - localwindow.available() - unread);
              }

          }  finally {
              transfered += count;
          }
        return count;
    }



    public synchronized int read(byte[] buf, int offset, int len) throws IOException {

      try {

        if(available() == -1)
          return -1;

        if(unread <= 0 && !isClosed()) {
          processMessages(messagefilter);
        }

        int count = unread < len ? unread : len;

        if(count == 0 && isClosed())
            return -1;

        int index = base;
        base = (base + count) % buffer.length;
        if (buffer.length - index > count) {
          System.arraycopy(buffer, index, buf, offset, count);
        }
        else {
          int remaining = buffer.length - index;
          System.arraycopy(buffer, index, buf, offset, remaining);
          System.arraycopy(buffer, 0, buf, offset + remaining,
                           count - remaining);
        }

        unread -= count;

        if ((unread + localwindow.available()) < (buffer.length / 2)) {
          adjustWindow(buffer.length - localwindow.available() - unread);
        }

        transfered += count;

        return count;
      }
      catch (EOFException ex) {
        return -1;
      }
    }

    void write(byte[] buf, int offset, int len) throws IOException {

      if (localwindow.available() < len) {

          /* DEBUG *///log.info("Window space exceeded!!");
          connection.disconnect(0,
            "Received data exceeding current window space");
         throw new IOException("Window space exceeded");
      }

      int i = 0;
      int index;
      int count;
      while(i < len) {
        // Copy data up to the end of the array and start back
        // at the beginning
        index = (base + unread) % buffer.length;
        count = ((buffer.length - index < len - i)
                 ? buffer.length - index
                 : len - i);
        System.arraycopy(buf, offset+i, buffer, index, count);
        unread+=count;
        i+=count;
      }

      localwindow.consume(len);

    }
  }

	  class DataWindow {
	    int windowsize;
	    int packetsize;

	    DataWindow(int windowsize, int packetsize) {
	      this.windowsize = windowsize;
	      this.packetsize = packetsize;
	    }

	    int getPacketSize() {
	      return packetsize;
	    }

	    void adjust(int count) {
	      windowsize += count;
	    }

	    void consume(int count) {
	      windowsize -= count;
	    }

	    int available() {
	      return windowsize;
	    }
	  }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -