📄 multiplexedconnection.java
字号:
/*
* Created on 08-May-2005
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package com.maverick.multiplex;
import java.io.*;
import java.util.Vector;
import java.util.Enumeration;
/**
* @author lee
*
* This class allows multiple channels to operate over a single stream.
*/
public class MultiplexedConnection {
Channel[] channels;
int connectionLimit;
int inactivityTimeout;
DataInputStream in;
DataOutputStream out;
boolean running = false;
ChannelFactory factory;
int totalChannels;
Thread thread;
Vector activeChannels = new Vector();
Vector listeners = new Vector();
/* DEBUG */static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
/* DEBUG */ .getLog(MultiplexedConnection.class);
public static final int MSG_CHANNEL_OPEN = 1;
public static final int MSG_CHANNEL_OPEN_CONFIRMATION = 2;
public static final int MSG_CHANNEL_OPEN_FAILURE = 3;
public static final int MSG_CHANNEL_DATA = 4;
public static final int MSG_CHANNEL_WINDOW_ADJUST = 5;
public static final int MSG_CHANNEL_CLOSE = 6;
public static final int MSG_DISCONNECT = 7;
MessageObserver channelOpenMessages = new MessageObserver() {
public boolean wantsNotification(Message msg) {
switch (msg.getMessageId()) {
case MSG_CHANNEL_OPEN_CONFIRMATION:
case MSG_CHANNEL_OPEN_FAILURE:
return true;
default:
return false;
}
}
};
public MultiplexedConnection(int maxChannels, int connectionLimit, int inactivityTimeout, InputStream in, OutputStream out,
ChannelFactory factory) {
channels = new Channel[maxChannels > 0 ? maxChannels : 100];
this.connectionLimit = connectionLimit;
this.inactivityTimeout = inactivityTimeout;
this.in = new DataInputStream(in);
this.out = new DataOutputStream(out);
this.factory = factory;
this.thread = new Thread(new Runner());
thread.start();
}
public int getTotalChannelCount() {
return totalChannels;
}
public int getActiveChannelCount() {
return activeChannels.size();
}
public synchronized Channel[] getActiveChannels() {
Channel[] tmp = new Channel[activeChannels.size()];
activeChannels.copyInto(tmp);
return tmp;
}
public void addListener(MultiplexedConnectionListener listener) {
if (listener != null)
listeners.addElement(listener);
}
public void stop() {
/* DEBUG */log.info("Shutting down multiplexed connection thread");
running = false;
if (!Thread.currentThread().equals(thread))
thread.interrupt();
}
protected void onChannelOpen(Message msg) throws IOException {
String type = msg.readString();
int remoteid = (int) msg.readInt();
int remotepacket = (int) msg.readInt();
int remotewindow = (int) msg.readInt();
byte[] data = new byte[msg.available()];
msg.read(data);
if (factory == null) {
sendChannelOpenFailure(remoteid, "This connection does not support the opening of channels");
return;
}
Channel channel = factory.createChannel(type);
if (channel == null) {
sendChannelOpenFailure(remoteid, "Failed to create channel of type " + type);
return;
}
channel.init(this, remoteid, remotepacket, remotewindow);
if (allocateChannel(channel) == -1) {
sendChannelOpenFailure(remoteid, "Too many channels already open");
} else {
data = channel.open(data);
sendChannelOpenConfirmation(channel, data);
}
channel.fireChannelOpen();
}
protected void onChannelMessage(Message msg) throws IOException {
int channelid = (int) msg.readInt();
Channel channel = channels[channelid];
if (channel != null) {
channel.messageStore.addMessage(msg);
} else {
/* DEBUG */log.warn("Message received for non-existent channel id " + channelid);
}
}
protected void onDisconnect(Message msg) throws IOException {
int reason = (int) msg.readInt();
String desc = msg.readString();
/* DEBUG */log.info("Remote disconnected:" + desc);
stop();
}
private synchronized int allocateChannel(Channel channel) {
for (int i = 0; i < channels.length; i++) {
if (channels[i] == null) {
channels[i] = channel;
activeChannels.addElement(channel);
totalChannels++;
channel.channelid = i;
return i;
}
}
return -1;
}
synchronized void freeChannel(Channel channel) {
channels[channel.channelid] = null;
activeChannels.removeElement(channel);
}
public synchronized void openChannel(Channel channel) throws IOException {
byte[] data = channel.create();
if (allocateChannel(channel) == -1) {
throw new IOException("Failed to allocate channel: too many active channels");
}
Packet msg = new Packet();
msg.write(MSG_CHANNEL_OPEN);
msg.writeString(channel.getType());
msg.writeInt(channel.channelid);
msg.writeInt(channel.getLocalPacket());
msg.writeInt(channel.getLocalWindow());
if (data != null)
msg.write(data);
sendMessage(msg);
Message reply = channel.messageStore.nextMessage(channelOpenMessages);
switch (reply.getMessageId()) {
case MSG_CHANNEL_OPEN_CONFIRMATION:
int remoteid = (int) reply.readInt();
int remotepacket = (int) reply.readInt();
int remotewindow = (int) reply.readInt();
channel.init(this, remoteid, remotepacket, remotewindow);
channel.fireChannelOpen();
break;
case MSG_CHANNEL_OPEN_FAILURE:
String desc = reply.readString();
throw new IOException(desc);
default:
throw new IOException("Unexpected reply in channel open procedure");
}
}
public void sendChannelData(Channel channel, byte[] data, int off, int len) throws IOException {
Packet msg = new Packet();
msg.write(MSG_CHANNEL_DATA);
msg.writeInt(channel.remoteid);
msg.writeBinaryString(data, off, len);
sendMessage(msg);
}
private void sendChannelOpenFailure(int channelid, String desc) throws IOException {
Packet msg = new Packet();
msg.write(MSG_CHANNEL_OPEN_CONFIRMATION);
msg.writeInt(channelid);
msg.writeString(desc);
sendMessage(msg);
}
private void sendChannelOpenConfirmation(Channel channel, byte[] data) throws IOException {
Packet msg = new Packet();
msg.write(MSG_CHANNEL_OPEN_CONFIRMATION);
msg.writeInt(channel.remoteid);
msg.writeInt(channel.channelid);
msg.writeInt(channel.getLocalPacket());
msg.writeInt(channel.getLocalWindow());
if (data != null)
msg.write(data);
sendMessage(msg);
}
public void sendWindowAdjust(Channel channel, int increment) throws IOException {
Packet msg = new Packet();
msg.write(MSG_CHANNEL_WINDOW_ADJUST);
msg.writeInt(channel.remoteid);
msg.writeInt(increment);
sendMessage(msg);
}
public void closeChannel(Channel channel) throws IOException {
Packet msg = new Packet();
msg.write(MSG_CHANNEL_CLOSE);
msg.writeInt(channel.remoteid);
sendMessage(msg);
}
public void closeAllChannels() {
for (int i = 0; i < channels.length; i++) {
if (channels[i] != null) {
try {
channels[i].close();
} catch (Throwable t) {
}
}
}
}
public void disconnect(int reason, String desc) {
/* DEBUG */log.info("Disconnecting multiplexed connection");
closeAllChannels();
try {
Packet msg = new Packet();
msg.write(MSG_DISCONNECT);
msg.writeInt(reason);
msg.writeString(desc);
sendMessage(msg);
} catch (IOException ex) {
/* DEBUG */log.info("Error on disconnect", ex);
} finally {
try {
in.close();
} catch (Throwable t) {
}
try {
out.close();
} catch (Throwable t) {
}
}
}
protected synchronized void sendMessage(Packet msg) throws IOException {
msg.prepare();
// log.info("Writing " + msg.size() + " bytes of socket data");
out.write(msg.array(), 0, msg.size());
out.flush();
}
class Runner implements Runnable {
/**
* Perform the multiplexed connection protocol
*/
public void run() {
running = true;
while (running) {
try {
int msglength = in.readInt();
if (msglength <= 0) {
/* DEBUG */log.error("Invalid message length of " + msglength + " bytes");
stop();
} else {
byte[] tmp = new byte[msglength];
in.readFully(tmp);
Message msg = new Message(tmp);
switch (msg.getMessageId()) {
case MSG_CHANNEL_OPEN:
onChannelOpen(msg);
break;
case MSG_CHANNEL_OPEN_CONFIRMATION:
onChannelMessage(msg);
break;
case MSG_CHANNEL_OPEN_FAILURE:
onChannelMessage(msg);
break;
case MSG_CHANNEL_DATA:
onChannelMessage(msg);
break;
case MSG_CHANNEL_WINDOW_ADJUST:
onChannelMessage(msg);
break;
case MSG_CHANNEL_CLOSE:
onChannelMessage(msg);
break;
case MSG_DISCONNECT:
onDisconnect(msg);
break;
default:
throw new IOException("Unexpected message id " + msg.getMessageId());
}
}
} catch (IOException ex) {
/* DEBUG */log.error("Multiplexed connection thread failed", ex);
stop();
}
}
for (Enumeration it = listeners.elements(); it.hasMoreElements();) {
((MultiplexedConnectionListener) it.nextElement()).onConnectionClose();
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -