📄 multiplexer.java
字号:
handshake(_out, _in);
}
/**
* Perform handshaking on initial connection, to verify protocol. Subclasses
* may extend this behaviour.
*
* @param out the endpoint's output stream
* @param in the endpoint's input stream
* @throws IOException for any I/O error
*/
protected void handshake(DataOutputStream out, DataInputStream in)
throws IOException {
out.writeInt(MAGIC);
out.writeInt(VERSION);
out.flush();
int magic = in.readInt();
if (magic != MAGIC) {
throw new ProtocolException("Expected protocol magic=" + MAGIC
+ ", but received=" + magic);
}
int version = in.readInt();
if (version != VERSION) {
throw new ProtocolException("Expected protocol version=" + VERSION
+ ", but received=" + version);
}
}
/**
* Perform authentication on initial connection.
*
* @param principal the security principal. May be <code>null</code>
* @throws IOException for any I/O error
* @throws SecurityException if connection is refused by the server
*/
protected void authenticate(Principal principal)
throws IOException, SecurityException {
try {
if (principal != null && !(principal instanceof BasicPrincipal)) {
throw new IOException(
"Cannot authenticate with principal of type "
+ principal.getClass().getName());
}
if (principal != null) {
BasicPrincipal basic = (BasicPrincipal) principal;
_out.writeByte(AUTH_BASIC);
_out.writeUTF(basic.getName());
_out.writeUTF(basic.getPassword());
} else {
_out.writeByte(AUTH_NONE);
}
_out.flush();
if (_in.readByte() != AUTH_OK) {
throw new SecurityException("Connection refused");
}
} catch (IOException exception) {
// terminate the connection
_endpoint.close();
throw exception;
}
_principal = principal;
}
/**
* Performs authentication on initial connection.
*
* @param authenticator the authenticator
* @throws IOException for any I/O error
* @throws ResourceException if the authenticator cannot authenticate
*/
protected void authenticate(Authenticator authenticator)
throws IOException, ResourceException {
try {
Principal principal = null;
byte type = _in.readByte();
switch (type) {
case AUTH_BASIC:
String name = _in.readUTF();
String password = _in.readUTF();
principal = new BasicPrincipal(name, password);
break;
case AUTH_NONE:
break;
default:
throw new IOException("Invalid packet type: " + type);
}
if (authenticator.authenticate(principal)) {
_out.writeByte(AUTH_OK);
_out.flush();
} else {
_out.writeByte(AUTH_DENIED);
_out.flush();
throw new SecurityException("User " + principal
+ " unauthorised");
}
_principal = principal;
} catch (IOException exception) {
// terminate the connection
_endpoint.close();
throw exception;
} catch (ResourceException exception) {
// terminate the connection
_endpoint.close();
throw exception;
}
}
/**
* Opens a new channel.
*
* @return a new channel
* @throws IOException if a channel can't be opened
*/
protected Channel open() throws IOException {
Channel channel;
int channelId;
synchronized (_channels) {
channelId = getNextChannelId();
channel = addChannel(channelId);
}
send(OPEN, channelId);
return channel;
}
/**
* Read a packet from the endpoint.
*/
private void multiplex() {
try {
byte type = _in.readByte();
switch (type) {
case OPEN:
handleOpen();
break;
case CLOSE:
handleClose();
break;
case REQUEST:
handleRequest();
break;
case RESPONSE:
handleResponse();
break;
case DATA:
handleData();
break;
case PING_REQUEST:
handlePingRequest();
break;
case PING_RESPONSE:
handlePingResponse();
break;
case FLOW_READ:
handleFlowRead();
break;
case SHUTDOWN:
handleShutdown();
break;
default:
throw new IOException("Unrecognised message type: "
+ type);
}
} catch (Exception exception) {
boolean closed = _closed;
shutdown();
if (!closed) {
_log.debug("Multiplexer shutting down on error", exception);
// error notify the listener
_listener.error(exception);
}
}
}
/**
* Shuts down the multiplexer.
*/
private void shutdown() {
// mark this as closed
_closed = true;
// notify the channels
Channel[] channels;
synchronized (_channels) {
channels = (Channel[]) _channels.values().toArray(new Channel[0]);
}
for (int i = 0; i < channels.length; ++i) {
channels[i].disconnected();
}
}
/**
* Open a new channel.
*
* @throws IOException for any error
*/
private void handleOpen() throws IOException {
int channelId = _in.readUnsignedShort();
Integer key = new Integer(channelId);
synchronized (_channels) {
if (_channels.get(key) != null) {
throw new IOException(
"A channel already exists with identifier: " + key);
}
addChannel(channelId);
}
}
/**
* Close a channel.
*
* @throws IOException for any error
*/
private void handleClose() throws IOException {
int channelId = _in.readUnsignedShort();
Integer key = new Integer(channelId);
synchronized (_channels) {
Channel channel = (Channel) _channels.remove(key);
if (channel == null) {
throw new IOException(
"No channel exists with identifier: " + key);
}
channel.close();
}
}
/**
* Handle a <code>REQUEST</code> packet.
*
* @throws IOException if an I/O error occurs, or no channel exists matching
* that read from the packet
*/
private void handleRequest() throws IOException {
final Channel channel = handleData();
Runnable request = new Runnable() {
public void run() {
if (_log.isDebugEnabled()) {
_log.debug("handleRequest() [channel="
+ channel.getId() + "]");
}
// todo - need to handle closed()
_listener.request(channel);
if (_log.isDebugEnabled()) {
_log.debug("handleRequest() [channel="
+ channel.getId() + "] - end");
}
}
};
try {
_pool.execute(request);
} catch (InterruptedException exception) {
throw new InterruptedIOException(exception.getMessage());
}
}
/**
* Handle a <code>RESPONSE</code> packet.
*
* @throws IOException if an I/O error occurs, or no channel exists matching
* that read from the packet
*/
private void handleResponse() throws IOException {
handleData();
}
/**
* Handle a <code>PING_REQUEST</code> packet.
*
* @throws IOException if an I/O error occurs
*/
private void handlePingRequest() throws IOException {
Channel channel = readChannel();
channel.handlePingRequest();
}
/**
* Handle a <code>PING_RESPONSE</code> packet.
*
* @throws IOException if an I/O error occurs
*/
private void handlePingResponse() throws IOException {
Channel channel = readChannel();
channel.handlePingResponse();
}
/**
* Handle a <code>DATA</code> packet.
*
* @return the channel to handle the packet
* @throws IOException if an I/O error occurs, or no channel exists matching
* that read from the packet
*/
private Channel handleData() throws IOException {
Channel channel = readChannel();
int length = _in.readInt();
channel.getMultiplexInputStream().receive(_in, length);
return channel;
}
/**
* Handle a <code>FLOW_READ</code> packet.
*
* @throws IOException if an I/O error occurs
*/
private void handleFlowRead() throws IOException {
Channel channel = readChannel();
int read = _in.readInt();
channel.getMultiplexOutputStream().notifyRead(read);
}
/**
* Handle a <code>SHUTDOWN</code> packet.
*/
private void handleShutdown() {
shutdown();
_listener.closed();
}
/**
* Adds a new channel.
* <p/>
* NOTE: Must be invoked with <code>_channels</code> synchronized
*
* @param channelId the channel identifier
* @return the new channel
*/
private Channel addChannel(int channelId) {
int size = BUFFER_SIZE;
MultiplexOutputStream out =
new MultiplexOutputStream(channelId, this, size, size);
MultiplexInputStream in =
new MultiplexInputStream(channelId, this, size);
Channel channel = new Channel(channelId, this, in, out);
_channels.put(new Integer(channelId), channel);
return channel;
}
/**
* Reads the channel identifier from the stream and returns the
* corresponding channel.
*
* @return the channel corresponding to the read channel identifier
* @throws IOException for any I/O error, or if there is no corresponding
* channel
*/
private Channel readChannel() throws IOException {
int channelId = _in.readUnsignedShort();
return getChannel(channelId);
}
/**
* Returns a channel given its identifier.
*
* @param channelId the channel identifier
* @return the channel corresponding to <code>channelId</code>
* @throws IOException if there is no corresponding channel
*/
private Channel getChannel(int channelId) throws IOException {
Channel channel;
Integer key = new Integer(channelId);
synchronized (_channels) {
channel = (Channel) _channels.get(key);
if (channel == null) {
throw new IOException(
"No channel exists with identifier: " + channelId);
}
}
return channel;
}
/**
* Returns the next available channel identifier. Channel identifiers
* generated on the client side are in the range 0x0..0x7FFF, on the server
* side, 0x8000-0xFFFF
* <p/>
* NOTE: Must be invoked with <code>_channels</code> synchronized
*
* @return the next channel identifier
* @throws IOException if the connection is closed
*/
private int getNextChannelId() throws IOException {
final int mask = 0x7fff;
final int serverIdBase = 0x8000;
int channelId = 0;
while (!_closed) {
_seed = (_seed + 1) & mask;
channelId = (_client) ? _seed : _seed + serverIdBase;
if (!_channels.containsKey(new Integer(channelId))) {
break;
}
}
if (_closed) {
throw new IOException("Connection has been closed");
}
return channelId;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -