📄 defaulttcptransportmapping.java
字号:
break;
}
// Someone is ready for I/O, get the ready keys
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
// Walk through the ready keys collection and process date requests.
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
it.remove();
SocketChannel readChannel = null;
TcpAddress incomingAddress = null;
if (sk.isAcceptable()) {
// The key indexes into the selector so you
// can retrieve the socket that's ready for I/O
ServerSocketChannel nextReady =
(ServerSocketChannel) sk.channel();
Socket s = nextReady.accept().socket();
readChannel = s.getChannel();
readChannel.configureBlocking(false);
readChannel.register(selector,
SelectionKey.OP_READ);
incomingAddress = new TcpAddress(s.getInetAddress(),
s.getPort());
SocketEntry entry = new SocketEntry(incomingAddress, s);
sockets.put(incomingAddress, entry);
timeoutSocket(entry);
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
incomingAddress,
TransportStateEvent.
STATE_CONNECTED,
null);
fireConnectionStateChanged(e);
if (e.isCancelled()) {
logger.warn("Incoming connection cancelled");
s.close();
sockets.remove(incomingAddress);
readChannel = null;
}
}
else if (sk.isReadable()) {
readChannel = (SocketChannel) sk.channel();
incomingAddress =
new TcpAddress(readChannel.socket().getInetAddress(),
readChannel.socket().getPort());
}
else if (sk.isWritable()) {
try {
SocketChannel sc = (SocketChannel) sk.channel();
SocketEntry entry;
synchronized (pending) {
try {
entry = (SocketEntry) pending.removeFirst();
}
catch (NoSuchElementException nsex) {
// ignore
entry = null;
}
}
if (entry != null) {
writeMessage(entry, sc);
}
}
catch (IOException iox) {
if (logger.isDebugEnabled()) {
iox.printStackTrace();
}
logger.warn(iox);
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
incomingAddress,
TransportStateEvent.
STATE_DISCONNECTED_REMOTELY,
iox);
fireConnectionStateChanged(e);
sk.cancel();
}
}
else if (sk.isConnectable()) {
try {
SocketEntry entry;
synchronized (pending) {
try {
entry = (SocketEntry) pending.getFirst();
if (entry != null) {
SocketChannel sc = (SocketChannel) sk.channel();
if ((!sc.isConnected()) && (sc.finishConnect())) {
sc.configureBlocking(false);
logger.debug("Connected to " + entry.getPeerAddress());
// make sure conncetion is closed if not used for timeout
// micro seconds
timeoutSocket(entry);
sc.register(selector,
SelectionKey.OP_WRITE);
}
}
}
catch (NoSuchElementException nsex) {
// ignore
entry = null;
}
}
if (entry != null) {
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
incomingAddress,
TransportStateEvent.
STATE_CONNECTED,
null);
fireConnectionStateChanged(e);
}
else {
logger.warn("Message not found on finish connection");
}
}
catch (IOException iox) {
if (logger.isDebugEnabled()) {
iox.printStackTrace();
}
logger.warn(iox);
sk.cancel();
}
}
if (readChannel != null) {
try {
readMessage(sk, readChannel, incomingAddress);
}
catch (IOException iox) {
// IO exception -> channel closed remotely
if (logger.isDebugEnabled()) {
iox.printStackTrace();
}
logger.warn(iox);
sk.cancel();
readChannel.close();
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
incomingAddress,
TransportStateEvent.
STATE_DISCONNECTED_REMOTELY,
iox);
fireConnectionStateChanged(e);
}
}
}
}
}
catch (NullPointerException npex) {
// There seems to happen a NullPointerException within the select()
npex.printStackTrace();
logger.warn("NullPointerException within select()?");
}
processPending();
}
if (ssc != null) {
ssc.close();
}
if (selector != null) {
selector.close();
}
}
catch (IOException iox) {
logger.error(iox);
lastError = iox;
}
if (!stop) {
stop = true;
synchronized (DefaultTcpTransportMapping.this) {
server = null;
}
}
}
private void readMessage(SelectionKey sk, SocketChannel readChannel,
TcpAddress incomingAddress) throws IOException {
// note that socket has been used
SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
if (entry != null) {
entry.used();
ByteBuffer readBuffer = entry.getReadBuffer();
if (readBuffer != null) {
readChannel.read(readBuffer);
if (readBuffer.hasRemaining()) {
readChannel.register(selector,
SelectionKey.OP_READ,
entry);
}
else {
dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
}
return;
}
}
ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
byteBuffer.limit(messageLengthDecoder.getMinHeaderLength());
long bytesRead = readChannel.read(byteBuffer);
if (logger.isDebugEnabled()) {
logger.debug("Reading header "+bytesRead+" bytes from " +
incomingAddress);
}
MessageLength messageLength = new MessageLength(0, Integer.MIN_VALUE);
if (bytesRead == messageLengthDecoder.getMinHeaderLength()) {
messageLength =
messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf));
if (logger.isDebugEnabled()) {
logger.debug("Message length is "+messageLength);
}
if ((messageLength.getMessageLength() > getMaxInboundMessageSize()) ||
(messageLength.getMessageLength() <= 0)) {
logger.error("Received message length "+messageLength+
" is greater than inboundBufferSize "+
getMaxInboundMessageSize());
synchronized(entry) {
entry.getSocket().close();
logger.info("Socket to "+entry.getPeerAddress()+
" closed due to an error");
}
}
else {
byteBuffer.limit(messageLength.getMessageLength());
bytesRead += readChannel.read(byteBuffer);
if (bytesRead == messageLength.getMessageLength()) {
dispatchMessage(incomingAddress, byteBuffer, bytesRead);
}
else {
byte[] message = new byte[byteBuffer.limit()];
byteBuffer.flip();
byteBuffer.get(message, 0,
byteBuffer.limit() - byteBuffer.remaining());
entry.setReadBuffer(ByteBuffer.wrap(message));
}
readChannel.register(selector,
SelectionKey.OP_READ,
entry);
}
}
else if (bytesRead < 0) {
logger.debug("Socket closed remotely");
sk.cancel();
readChannel.close();
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
incomingAddress,
TransportStateEvent.
STATE_DISCONNECTED_REMOTELY,
null);
fireConnectionStateChanged(e);
}
}
private void dispatchMessage(TcpAddress incomingAddress,
ByteBuffer byteBuffer, long bytesRead) {
byteBuffer.flip();
if (logger.isDebugEnabled()) {
logger.debug("Received message from " + incomingAddress +
" with length " + bytesRead + ": " +
new OctetString(byteBuffer.array(), 0,
(int)bytesRead).toHexString());
}
ByteBuffer bis;
if (isAsyncMsgProcessingSupported()) {
byte[] bytes = new byte[(int)bytesRead];
System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
bis = ByteBuffer.wrap(bytes);
}
else {
bis = ByteBuffer.wrap(byteBuffer.array(),
0, (int) bytesRead);
}
fireProcessMessage(incomingAddress, bis);
}
private void writeMessage(SocketEntry entry, SocketChannel sc) throws
IOException {
byte[] message = entry.nextMessage();
if (message != null) {
ByteBuffer buffer = ByteBuffer.wrap(message);
sc.write(buffer);
if (logger.isDebugEnabled()) {
logger.debug("Send message with length " +
message.length + " to " +
entry.getPeerAddress() + ": " +
new OctetString(message).toHexString());
}
sc.register(selector, SelectionKey.OP_READ);
}
}
public void close() {
stop = true;
ServerThread st = server;
if (st != null) {
st.interrupt();
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -