📄 defaulttcptransportmapping.java
字号:
entry.getSocket().close();
}
logger.info("Socket to "+entry.getPeerAddress()+
" closed due to timeout");
}
catch (IOException ex) {
logger.error(ex);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling " +
((entry.getLastUse() + connectionTimeout) - now));
}
socketCleaner.schedule(new SocketTimeout(entry),
(entry.getLastUse() + connectionTimeout) - now);
}
}
}
class ServerThread extends Thread {
private static final int MINIMUM_HEADER_LENGTH = 6;
private byte[] buf;
private volatile boolean stop = false;
private Throwable lastError = null;
private ServerSocketChannel ssc;
private Selector selector;
private LinkedList pending = new LinkedList();
public ServerThread() throws IOException {
buf = new byte[getMaxInboundMessageSize()];
// Selector for incoming requests
selector = Selector.open();
if (serverEnabled) {
// Create a new server socket and set to non blocking mode
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// Bind the server socket
InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
tcpAddress.getPort());
ssc.socket().bind(isa);
// Register accepts on the server socket with the selector. This
// step tells the selector that the socket wants to be put on the
// ready list when accept operations occur, so allowing multiplexed
// non-blocking I/O to take place.
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
}
private void processPending() {
synchronized (pending) {
while (pending.size() > 0) {
SocketEntry entry = (SocketEntry)pending.removeFirst();
try {
// Register the channel with the selector, indicating
// interest in connection completion and attaching the
// target object so that we can get the target back
// after the key is added to the selector's
// selected-key set
if (entry.getSocket().isConnected()) {
entry.getSocket().getChannel().register(selector,
SelectionKey.OP_WRITE,
entry);
}
else {
entry.getSocket().getChannel().register(selector,
SelectionKey.OP_CONNECT,
entry);
}
}
catch (IOException iox) {
logger.error(iox);
// Something went wrong, so close the channel and
// record the failure
try {
entry.getSocket().getChannel().close();
}
catch (IOException ex) {
logger.error(ex);
}
lastError = iox;
}
}
}
}
public Throwable getLastError() {
return lastError;
}
public void sendMessage(Address address, byte[] message)
throws java.io.IOException
{
Socket s = null;
SocketEntry entry = (SocketEntry) sockets.get(address);
if (entry != null) {
s = entry.getSocket();
}
if ((s == null) || (s.isClosed())) {
SocketChannel sc = null;
try {
// Open the channel, set it to non-blocking, initiate connect
sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
((TcpAddress)address).getPort()));
s = sc.socket();
entry = new SocketEntry((TcpAddress)address, s);
entry.addMessage(message);
sockets.put(address, entry);
synchronized (pending) {
pending.add(entry);
}
selector.wakeup();
logger.debug("Trying to connect to "+address);
}
catch (IOException iox) {
logger.error(iox);
throw iox;
}
}
else {
entry.addMessage(message);
synchronized (pending) {
pending.add(entry);
}
selector.wakeup();
}
}
public void run() {
// Here's where everything happens. The select method will
// return when any operations registered above have occurred, the
// thread has been interrupted, etc.
try {
while (!stop) {
if (selector.select() > 0) {
if (stop) {
break;
}
// Someone is ready for I/O, get the ready keys
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
// Walk through the ready keys collection and process date requests.
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
it.remove();
SocketChannel readChannel = null;
TcpAddress incomingAddress = null;
if (sk.isAcceptable()) {
// The key indexes into the selector so you
// can retrieve the socket that's ready for I/O
ServerSocketChannel nextReady =
(ServerSocketChannel) sk.channel();
// Accept the date request and send back the date string
Socket s = nextReady.accept().socket();
readChannel = s.getChannel();
readChannel.register(selector, SelectionKey.OP_READ);
incomingAddress = new TcpAddress(s.getInetAddress(), s.getPort());
SocketEntry entry = new SocketEntry(incomingAddress, s);
sockets.put(incomingAddress, entry);
timeoutSocket(entry);
}
else if (sk.isReadable()) {
readChannel = (SocketChannel) sk.channel();
incomingAddress =
new TcpAddress(readChannel.socket().getInetAddress(),
readChannel.socket().getPort());
}
else if (sk.isConnectable()) {
try {
SocketEntry entry = (SocketEntry) sk.attachment();
SocketChannel sc = (SocketChannel) sk.channel();
if (sc.finishConnect()) {
logger.debug("Connected to " + entry.getPeerAddress());
// make sure conncetion is closed if not used for timeout
// micro seconds
timeoutSocket(entry);
sc.register(selector, SelectionKey.OP_WRITE, entry);
}
}
catch (IOException iox) {
logger.warn(iox);
}
}
else if (sk.isWritable()) {
try {
SocketEntry entry = (SocketEntry) sk.attachment();
SocketChannel sc = (SocketChannel) sk.channel();
if (entry != null) {
writeMessage(entry, sc);
}
}
catch (IOException iox) {
logger.warn(iox);
}
}
if (readChannel != null) {
try {
readMessage(sk, readChannel, incomingAddress);
}
catch (IOException iox) {
logger.warn(iox);
}
}
}
}
processPending();
}
if (ssc != null) {
ssc.close();
}
}
catch (IOException iox) {
logger.error(iox);
lastError = iox;
}
}
private void readMessage(SelectionKey sk, SocketChannel readChannel,
TcpAddress incomingAddress) throws IOException {
// note that socket has been used
SocketEntry entry = (SocketEntry) sockets.get(incomingAddress);
if (entry != null) {
entry.used();
ByteBuffer readBuffer = entry.getReadBuffer();
if (readBuffer != null) {
readChannel.read(readBuffer);
if (readBuffer.hasRemaining()) {
readChannel.register(selector, SelectionKey.OP_READ, entry);
}
else {
dispatchMessage(incomingAddress, readBuffer, readBuffer.capacity());
}
return;
}
}
ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
byteBuffer.limit(MINIMUM_HEADER_LENGTH);
long bytesRead = readChannel.read(byteBuffer);
if (logger.isDebugEnabled()) {
logger.debug("Reading header "+bytesRead+" bytes from " +
incomingAddress);
}
int messageLength = Integer.MIN_VALUE;
int headerLength = 0;
if (bytesRead == MINIMUM_HEADER_LENGTH) {
MutableByte type = new MutableByte();
BERInputStream is = new BERInputStream(ByteBuffer.wrap(buf));
messageLength = BER.decodeHeader(is, type);
if ((messageLength > getMaxInboundMessageSize()) || (messageLength <= 0)) {
logger.error("Received message length "+messageLength+
" is greater than inboundBufferSize "+
getMaxInboundMessageSize());
synchronized(entry) {
entry.getSocket().close();
logger.info("Socket to "+entry.getPeerAddress()+
" closed due to an error");
}
}
else {
headerLength = (int) is.getPosition();
byteBuffer.limit(messageLength + headerLength);
bytesRead += readChannel.read(byteBuffer);
if (bytesRead == messageLength + headerLength) {
dispatchMessage(incomingAddress, byteBuffer, bytesRead);
}
else {
byte[] message = new byte[byteBuffer.limit()];
byteBuffer.flip();
byteBuffer.get(message, 0,
byteBuffer.limit() - byteBuffer.remaining());
entry.setReadBuffer(ByteBuffer.wrap(message));
}
readChannel.register(selector, SelectionKey.OP_READ, entry);
}
}
else if (bytesRead < 0) {
logger.debug("Socket closed remotely");
sk.cancel();
readChannel.close();
}
}
private void dispatchMessage(TcpAddress incomingAddress,
ByteBuffer byteBuffer, long bytesRead) {
byteBuffer.flip();
if (logger.isDebugEnabled()) {
logger.debug("Received message from " + incomingAddress +
" with length " + bytesRead + ": " +
new OctetString(byteBuffer.array(), 0,
(int)bytesRead).toHexString());
}
for (int i = 0; i < messageDispatcher.size(); i++) {
MessageDispatcher dispatcher;
synchronized (DefaultTcpTransportMapping.this) {
dispatcher = (MessageDispatcher) messageDispatcher.get(i);
}
ByteBuffer bis;
if (isAsyncMsgProcessingSupported()) {
byte[] bytes = new byte[(int)bytesRead];
System.arraycopy(byteBuffer.array(), 0, bytes, 0, (int)bytesRead);
bis = ByteBuffer.wrap(bytes);
}
else {
bis = ByteBuffer.wrap(byteBuffer.array(),
0, (int) bytesRead);
}
dispatcher.processMessage(DefaultTcpTransportMapping.this,
incomingAddress,
new BERInputStream(bis));
}
}
private void writeMessage(SocketEntry entry, SocketChannel sc) throws
IOException {
byte[] message = entry.nextMessage();
ByteBuffer buffer = ByteBuffer.wrap(message);
sc.write(buffer);
if (logger.isDebugEnabled()) {
logger.debug("Send message with length " +
message.length + " to " +
entry.getPeerAddress() + ": " +
new OctetString(message).toHexString());
}
sc.register(selector, SelectionKey.OP_READ);
}
public void close() {
stop = true;
server.interrupt();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -