⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channelmanager.java

📁 thinking in java4 src
💻 JAVA
📖 第 1 页 / 共 3 页
字号:

package ch.ethz.ssh2.channel;

import java.io.IOException;
import java.util.HashMap;
import java.util.Vector;

import ch.ethz.ssh2.ChannelCondition;
import ch.ethz.ssh2.log.Logger;
import ch.ethz.ssh2.packets.PacketChannelOpenConfirmation;
import ch.ethz.ssh2.packets.PacketChannelOpenFailure;
import ch.ethz.ssh2.packets.PacketGlobalCancelForwardRequest;
import ch.ethz.ssh2.packets.PacketGlobalForwardRequest;
import ch.ethz.ssh2.packets.PacketOpenDirectTCPIPChannel;
import ch.ethz.ssh2.packets.PacketOpenSessionChannel;
import ch.ethz.ssh2.packets.PacketSessionExecCommand;
import ch.ethz.ssh2.packets.PacketSessionPtyRequest;
import ch.ethz.ssh2.packets.PacketSessionStartShell;
import ch.ethz.ssh2.packets.PacketSessionSubsystemRequest;
import ch.ethz.ssh2.packets.PacketSessionX11Request;
import ch.ethz.ssh2.packets.Packets;
import ch.ethz.ssh2.packets.TypesReader;
import ch.ethz.ssh2.transport.MessageHandler;
import ch.ethz.ssh2.transport.TransportManager;

/**
 * ChannelManager. Please read the comments in Channel.java.
 * <p>
 * Besides the crypto part, this is the core of the library.
 * 
 * @author Christian Plattner, plattner@inf.ethz.ch
 * @version $Id: ChannelManager.java,v 1.13 2006/02/14 15:17:37 cplattne Exp $
 */
public class ChannelManager implements MessageHandler
{
	private static final Logger log = Logger.getLogger(ChannelManager.class);

	private HashMap x11_magic_cookies = new HashMap();

	private TransportManager tm;

	private Vector channels = new Vector();
	private int nextLocalChannel = 100;
	private boolean shutdown = false;
	private int globalSuccessCounter = 0;
	private int globalFailedCounter = 0;

	private HashMap remoteForwardings = new HashMap();

	private Vector listenerThreads = new Vector();

	private boolean listenerThreadsAllowed = true;

	public ChannelManager(TransportManager tm)
	{
		this.tm = tm;
		tm.registerMessageHandler(this, 80, 100);
	}

	private Channel getChannel(int id)
	{
		synchronized (channels)
		{
			for (int i = 0; i < channels.size(); i++)
			{
				Channel c = (Channel) channels.elementAt(i);
				if (c.localID == id)
					return c;
			}
		}
		return null;
	}

	private void removeChannel(int id)
	{
		synchronized (channels)
		{
			for (int i = 0; i < channels.size(); i++)
			{
				Channel c = (Channel) channels.elementAt(i);
				if (c.localID == id)
				{
					channels.removeElementAt(i);
					break;
				}
			}
		}
	}

	private int addChannel(Channel c)
	{
		synchronized (channels)
		{
			channels.addElement(c);
			return nextLocalChannel++;
		}
	}

	private void waitUntilChannelOpen(Channel c) throws IOException
	{
		synchronized (c)
		{
			while (c.state == Channel.STATE_OPENING)
			{
				try
				{
					c.wait();
				}
				catch (InterruptedException ignore)
				{
				}
			}

			if (c.state != Channel.STATE_OPEN)
			{
				removeChannel(c.localID);

				String detail = c.getReasonClosed();

				if (detail == null)
					detail = "state: " + c.state;

				throw new IOException("Could not open channel (" + detail + ")");
			}
		}
	}

	private final void waitForGlobalSuccessOrFailure() throws IOException
	{
		synchronized (channels)
		{
			while ((globalSuccessCounter == 0) && (globalFailedCounter == 0))
			{
				if (shutdown)
				{
					throw new IOException("The connection is being shutdown");
				}

				try
				{
					channels.wait();
				}
				catch (InterruptedException ignore)
				{
				}
			}

			if (globalFailedCounter != 0)
			{
				throw new IOException("The server denied the request (did you enable port forwarding?)");
			}

			if (globalSuccessCounter == 0)
			{
				throw new IOException("Illegal state.");
			}

		}
	}

	private final void waitForChannelSuccessOrFailure(Channel c) throws IOException
	{
		synchronized (c)
		{
			while ((c.successCounter == 0) && (c.failedCounter == 0))
			{
				if (c.state != Channel.STATE_OPEN)
				{
					String detail = c.getReasonClosed();

					if (detail == null)
						detail = "state: " + c.state;

					throw new IOException("This SSH2 channel is not open (" + detail + ")");
				}

				try
				{
					c.wait();
				}
				catch (InterruptedException ignore)
				{
				}
			}

			if (c.failedCounter != 0)
			{
				throw new IOException("The server denied the request.");
			}
		}
	}

	public void registerX11Cookie(String hexFakeCookie, X11ServerData data)
	{
		synchronized (x11_magic_cookies)
		{
			x11_magic_cookies.put(hexFakeCookie, data);
		}
	}

	public void unRegisterX11Cookie(String hexFakeCookie, boolean killChannels)
	{
		if (hexFakeCookie == null)
			throw new IllegalStateException("hexFakeCookie may not be null");

		synchronized (x11_magic_cookies)
		{
			x11_magic_cookies.remove(hexFakeCookie);
		}

		if (killChannels == false)
			return;

		if (log.isEnabled())
			log.log(50, "Closing all X11 channels for the given fake cookie");

		Vector channel_copy;

		synchronized (channels)
		{
			channel_copy = (Vector) channels.clone();
		}

		for (int i = 0; i < channel_copy.size(); i++)
		{
			Channel c = (Channel) channel_copy.elementAt(i);

			synchronized (c)
			{
				if (hexFakeCookie.equals(c.hexX11FakeCookie) == false)
					continue;
			}

			try
			{
				closeChannel(c, "Closing X11 channel since the corresponding session is closing", true);
			}
			catch (IOException e)
			{
			}
		}
	}

	public X11ServerData checkX11Cookie(String hexFakeCookie)
	{
		synchronized (x11_magic_cookies)
		{
			if (hexFakeCookie != null)
				return (X11ServerData) x11_magic_cookies.get(hexFakeCookie);
		}
		return null;
	}

	public void closeAllChannels()
	{
		if (log.isEnabled())
			log.log(50, "Closing all channels");

		Vector channel_copy;

		synchronized (channels)
		{
			channel_copy = (Vector) channels.clone();
		}

		for (int i = 0; i < channel_copy.size(); i++)
		{
			Channel c = (Channel) channel_copy.elementAt(i);
			try
			{
				closeChannel(c, "Closing all channels", true);
			}
			catch (IOException e)
			{
			}
		}
	}

	public void closeChannel(Channel c, String reason, boolean force) throws IOException
	{
		byte msg[] = new byte[5];

		synchronized (c)
		{
			if (force)
			{
				c.state = Channel.STATE_CLOSED;
				c.EOF = true;
			}

			c.setReasonClosed(reason);

			msg[0] = Packets.SSH_MSG_CHANNEL_CLOSE;
			msg[1] = (byte) (c.remoteID >> 24);
			msg[2] = (byte) (c.remoteID >> 16);
			msg[3] = (byte) (c.remoteID >> 8);
			msg[4] = (byte) (c.remoteID);

			c.notifyAll();
		}

		synchronized (c.channelSendLock)
		{
			if (c.closeMessageSent == true)
				return;
			tm.sendMessage(msg);
			c.closeMessageSent = true;
		}

		if (log.isEnabled())
			log.log(50, "Sent SSH_MSG_CHANNEL_CLOSE (channel " + c.localID + ")");
	}

	public void sendEOF(Channel c) throws IOException
	{
		byte[] msg = new byte[5];

		synchronized (c)
		{
			if (c.state != Channel.STATE_OPEN)
				return;

			msg[0] = Packets.SSH_MSG_CHANNEL_EOF;
			msg[1] = (byte) (c.remoteID >> 24);
			msg[2] = (byte) (c.remoteID >> 16);
			msg[3] = (byte) (c.remoteID >> 8);
			msg[4] = (byte) (c.remoteID);
		}

		synchronized (c.channelSendLock)
		{
			if (c.closeMessageSent == true)
				return;
			tm.sendMessage(msg);
		}

		if (log.isEnabled())
			log.log(50, "Sent EOF (Channel " + c.localID + "/" + c.remoteID + ")");
	}

	public void sendOpenConfirmation(Channel c) throws IOException
	{
		PacketChannelOpenConfirmation pcoc = null;

		synchronized (c)
		{
			if (c.state != Channel.STATE_OPENING)
				return;

			c.state = Channel.STATE_OPEN;

			pcoc = new PacketChannelOpenConfirmation(c.remoteID, c.localID, c.localWindow, c.localMaxPacketSize);
		}

		synchronized (c.channelSendLock)
		{
			if (c.closeMessageSent == true)
				return;
			tm.sendMessage(pcoc.getPayload());
		}
	}

	public void sendData(Channel c, byte[] buffer, int pos, int len) throws IOException
	{
		while (len > 0)
		{
			int thislen = 0;
			byte[] msg;

			synchronized (c)
			{
				while (true)
				{
					if (c.state == Channel.STATE_CLOSED)
						throw new IOException("SSH channel is closed. (" + c.getReasonClosed() + ")");

					if (c.state != Channel.STATE_OPEN)
						throw new IOException("SSH channel in strange state. (" + c.state + ")");

					if (c.remoteWindow != 0)
						break;

					try
					{
						c.wait();
					}
					catch (InterruptedException ignore)
					{
					}
				}

				/* len > 0, no sign extension can happen when comparing */

				thislen = (c.remoteWindow >= len) ? len : (int) c.remoteWindow;

				int estimatedMaxDataLen = c.remoteMaxPacketSize - (tm.getPacketOverheadEstimate() + 9);

				/* The worst case scenario =) a true bottleneck */

				if (estimatedMaxDataLen <= 0)
				{
					estimatedMaxDataLen = 1;
				}

				if (thislen > estimatedMaxDataLen)
					thislen = estimatedMaxDataLen;

				c.remoteWindow -= thislen;

				msg = new byte[1 + 8 + thislen];

				msg[0] = Packets.SSH_MSG_CHANNEL_DATA;
				msg[1] = (byte) (c.remoteID >> 24);
				msg[2] = (byte) (c.remoteID >> 16);
				msg[3] = (byte) (c.remoteID >> 8);
				msg[4] = (byte) (c.remoteID);
				msg[5] = (byte) (thislen >> 24);
				msg[6] = (byte) (thislen >> 16);
				msg[7] = (byte) (thislen >> 8);
				msg[8] = (byte) (thislen);

				System.arraycopy(buffer, pos, msg, 9, thislen);
			}

			synchronized (c.channelSendLock)
			{
				if (c.closeMessageSent == true)
					throw new IOException("SSH channel is closed. (" + c.getReasonClosed() + ")");

				tm.sendMessage(msg);
			}

			pos += thislen;
			len -= thislen;
		}
	}

	public int requestGlobalForward(String bindAddress, int bindPort, String targetAddress, int targetPort)
			throws IOException
	{
		RemoteForwardingData rfd = new RemoteForwardingData();

		rfd.bindAddress = bindAddress;
		rfd.bindPort = bindPort;
		rfd.targetAddress = targetAddress;
		rfd.targetPort = targetPort;

		synchronized (remoteForwardings)
		{
			Integer key = new Integer(bindPort);

			if (remoteForwardings.get(key) != null)
			{
				throw new IOException("There is already a forwarding for remote port " + bindPort);
			}

			remoteForwardings.put(key, rfd);
		}

		synchronized (channels)
		{
			globalSuccessCounter = globalFailedCounter = 0;
		}

		PacketGlobalForwardRequest pgf = new PacketGlobalForwardRequest(true, bindAddress, bindPort);
		tm.sendMessage(pgf.getPayload());

		if (log.isEnabled())
			log.log(50, "Requesting a remote forwarding ('" + bindAddress + "', " + bindPort + ")");

		try
		{
			waitForGlobalSuccessOrFailure();
		}
		catch (IOException e)
		{
			synchronized (remoteForwardings)
			{
				remoteForwardings.remove(rfd);
			}
			throw e;
		}

		return bindPort;
	}

	public void requestCancelGlobalForward(int bindPort) throws IOException
	{
		RemoteForwardingData rfd = null;

		synchronized (remoteForwardings)
		{
			rfd = (RemoteForwardingData) remoteForwardings.get(new Integer(bindPort));

			if (rfd == null)
				throw new IOException("Sorry, there is no known remote forwarding for remote port " + bindPort);
		}

		synchronized (channels)
		{
			globalSuccessCounter = globalFailedCounter = 0;
		}

		PacketGlobalCancelForwardRequest pgcf = new PacketGlobalCancelForwardRequest(true, rfd.bindAddress,
				rfd.bindPort);
		tm.sendMessage(pgcf.getPayload());

		if (log.isEnabled())
			log.log(50, "Requesting cancelation of remote forward ('" + rfd.bindAddress + "', " + rfd.bindPort + ")");

		waitForGlobalSuccessOrFailure();

		/* Only now we are sure that no more forwarded connections will arrive */

		synchronized (remoteForwardings)
		{
			remoteForwardings.remove(rfd);
		}
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -