📄 channel.java
字号:
/*
* Created on 08-May-2005
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package com.maverick.multiplex;
import java.io.*;
/* DEBUG */import org.apache.commons.logging.*;
import java.util.Vector;
import java.util.Enumeration;
/**
*
* @author lee
*
* TODO To change the template for this generated type comment go to
* Window - Preferences - Java - Code Style - Code Templates
*/
public abstract class Channel {
MultiplexedConnection connection;
int channelid;
int remoteid;
String type;
DataWindow remotewindow;
DataWindow localwindow;
Vector listeners = new Vector();
ChannelInputStream in;
ChannelOutputStream out;
int windowSequence = 0;
/* DEBUG */Log log = LogFactory.getLog(Channel.class);
MessageObserver stickyMessages = new MessageObserver() {
public boolean wantsNotification(Message msg) {
switch(msg.getMessageId()) {
case MultiplexedConnection.MSG_CHANNEL_CLOSE:
return true;
default:
return false;
}
}
};
final MessageObserver WINDOW_ADJUST_MESSAGES = new MessageObserver() {
public boolean wantsNotification(Message msg) {
switch(msg.getMessageId()) {
case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
case MultiplexedConnection.MSG_CHANNEL_CLOSE:
return true;
default:
return false;
}
}
};
final MessageObserver CHANNEL_CLOSE_MESSAGES = new MessageObserver() {
public boolean wantsNotification(Message msg) {
switch(msg.getMessageId()) {
case MultiplexedConnection.MSG_CHANNEL_CLOSE:
return true;
default:
return false;
}
}
};
final MessageObserver CHANNEL_DATA_MESSAGES = new MessageObserver() {
public boolean wantsNotification(Message msg) {
// Access to this observer is synchronized by the ThreadSynchronizer
// so we can flag our InputStream as blocking when the method is
// called and released once we have found a message
switch(msg.getMessageId()) {
case MultiplexedConnection.MSG_CHANNEL_DATA:
case MultiplexedConnection.MSG_CHANNEL_CLOSE:
return true;
default:
return false;
}
}
};
MessageStore messageStore = new MessageStore(this, stickyMessages);
public Channel(String type, int localpacket, int localwindow) {
this.type = type;
this.localwindow = new DataWindow(localpacket, localwindow);
in = new ChannelInputStream(CHANNEL_DATA_MESSAGES);
out = new ChannelOutputStream();
}
public void init(MultiplexedConnection connection, int remoteid, int remotepacket, int remotewindow) {
this.connection = connection;
this.remoteid = remoteid;
this.remotewindow = new DataWindow(remotewindow, remotepacket);
}
boolean closing = false;
public void close() {
boolean performClose = false;;
synchronized(this) {
if(!closing)
performClose = closing = true;
}
if (performClose) {
try {
// Close the ChannelOutputStream
out.close();
// Send our close message
connection.closeChannel(this);
synchronized(messageStore) {
if (!messageStore.isClosed()) {
// Wait for the other side to close the channel
processMessages(CHANNEL_CLOSE_MESSAGES);
}
}
} catch (EOFException eof) {
// Ignore this is the message store informing of close/eof
} catch (IOException ex) {
// IO Error during close so the connection has dropped
connection.disconnect(1,
"IOException during channel close: " +
ex.getMessage());
} finally {
if(connection!=null)
connection.freeChannel(this);
onChannelClose();
for(Enumeration e = listeners.elements(); e.hasMoreElements();) {
((ChannelListener)e.nextElement()).onChannelClose(this);
}
}
}
}
public abstract byte[] open(byte[] data) throws IOException;
public abstract byte[] create() throws IOException;
void fireChannelOpen() {
onChannelOpen();
for (Enumeration e = listeners.elements(); e.hasMoreElements(); ) {
((ChannelListener) e.nextElement()).onChannelOpen(this);
}
}
public abstract void onChannelOpen();
public abstract void onChannelClose();
public void addListener(ChannelListener listener) {
if(listener!=null)
listeners.addElement(listener);
}
public OutputStream getOutputStream() {
return out;
}
public InputStream getInputStream() {
return in;
}
public String getType() {
return type;
}
public int getLocalWindow() {
return localwindow.available();
}
public int getLocalPacket() {
return localwindow.getPacketSize();
}
public boolean isClosed() {
return messageStore.isClosed();
}
protected void adjustWindow(int increment) throws IOException {
/* DEBUG *///log.info("Adjusting local window with " + increment + " bytes");
localwindow.adjust(increment);
connection.sendWindowAdjust(this, increment);
}
Message processMessages(MessageObserver messagefilter)
throws IOException,
EOFException {
Message msg;
/**
* Collect the next channel message from the connection protocol
*/
msg = messageStore.nextMessage(messagefilter);
switch (msg.getMessageId()) {
case MultiplexedConnection.MSG_CHANNEL_WINDOW_ADJUST:
int i = (int)msg.readInt();
remotewindow.adjust(i);
/* DEBUG *///log.info("ADJUST " + i + " window=" + remotewindow.available() + " seqence=" + windowSequence);
windowSequence++;
break;
case MultiplexedConnection.MSG_CHANNEL_DATA:
msg.skip(4); // Skip the length
in.write(msg.array(), msg.getPosition(), msg.available());
break;
case MultiplexedConnection.MSG_CHANNEL_CLOSE:
messageStore.close();
close();
throw new EOFException("The channel is closed");
default:
break;
}
return msg;
}
private void jbInit() throws Exception {
}
class ChannelOutputStream
extends OutputStream {
boolean isEOF = false;
public void write(int b) throws java.io.IOException {
write(new byte[] { (byte) b}
, 0, 1);
}
public synchronized void write(byte[] buf, int offset, int len) throws IOException {
int write;
//log.info("Write " + len + " bytes");
do {
if(remotewindow.available() <= 0) {
/* DEBUG *///log.info("OutputStream::need window space");
Message msg = processMessages(WINDOW_ADJUST_MESSAGES);
/* DEBUG *///log.info("OutputStream::got window space window=" + remotewindow.available());
}
if(isClosed()) {
throw new IOException("The channel is closed!");
}
write = remotewindow.available() < remotewindow.getPacketSize()
? (remotewindow.available() < len ? remotewindow.available() : len)
:
(remotewindow.getPacketSize() < len ? remotewindow.getPacketSize() :
len);
if(write > 0) {
// log.info("Sending " + write + " bytes window=" + remotewindow.available());
connection.sendChannelData(Channel.this, buf, offset, write);
remotewindow.consume(write);
//log.info("CONSUME " + write + " window=" + remotewindow.available());
len -= write;
offset += write;
}
}
while(len > 0);
}
public void close() throws IOException {
Channel.this.close();
}
}
class ChannelInputStream
extends InputStream {
byte[] buffer;
int unread = 0;
int position = 0;
int base = 0;
MessageObserver messagefilter;
long transfered = 0;
ChannelInputStream(MessageObserver messagefilter) {
buffer = new byte[localwindow.available()];
this.messagefilter = messagefilter;
}
public synchronized int available() throws IOException {
try {
if (unread == 0) {
if (messageStore.hasMessage(messagefilter) != null) {
processMessages(messagefilter);
}
}
return unread;
} catch (EOFException ex) {
return -1;
}
}
public void close() {
Channel.this.close();
}
public int read() throws IOException {
byte[] b = new byte[1];
int ret = read(b, 0, 1);
if (ret > 0) {
return b[0] & 0xFF;
}
else {
return -1;
}
}
public long skip(long len) throws IOException {
int count = unread < len ? unread : (int)len;
try {
if (count == 0 && isClosed())
throw new EOFException("The inputstream is closed");
int index = base;
base = (base + count) % buffer.length;
unread -= count;
if ((unread + localwindow.available()) < (buffer.length / 2)) {
adjustWindow(buffer.length - localwindow.available() - unread);
}
} finally {
transfered += count;
}
return count;
}
public synchronized int read(byte[] buf, int offset, int len) throws IOException {
try {
if(available() == -1)
return -1;
if(unread <= 0 && !isClosed()) {
processMessages(messagefilter);
}
int count = unread < len ? unread : len;
if(count == 0 && isClosed())
return -1;
int index = base;
base = (base + count) % buffer.length;
if (buffer.length - index > count) {
System.arraycopy(buffer, index, buf, offset, count);
}
else {
int remaining = buffer.length - index;
System.arraycopy(buffer, index, buf, offset, remaining);
System.arraycopy(buffer, 0, buf, offset + remaining,
count - remaining);
}
unread -= count;
if ((unread + localwindow.available()) < (buffer.length / 2)) {
adjustWindow(buffer.length - localwindow.available() - unread);
}
transfered += count;
return count;
}
catch (EOFException ex) {
return -1;
}
}
void write(byte[] buf, int offset, int len) throws IOException {
if (localwindow.available() < len) {
/* DEBUG *///log.info("Window space exceeded!!");
connection.disconnect(0,
"Received data exceeding current window space");
throw new IOException("Window space exceeded");
}
int i = 0;
int index;
int count;
while(i < len) {
// Copy data up to the end of the array and start back
// at the beginning
index = (base + unread) % buffer.length;
count = ((buffer.length - index < len - i)
? buffer.length - index
: len - i);
System.arraycopy(buf, offset+i, buffer, index, count);
unread+=count;
i+=count;
}
localwindow.consume(len);
}
}
class DataWindow {
int windowsize;
int packetsize;
DataWindow(int windowsize, int packetsize) {
this.windowsize = windowsize;
this.packetsize = packetsize;
}
int getPacketSize() {
return packetsize;
}
void adjust(int count) {
windowsize += count;
}
void consume(int count) {
windowsize -= count;
}
int available() {
return windowsize;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -