📄 connection.java
字号:
// TODO return a boolean indicating if the message was enqueued
/**
* Send a priority message. Priority message are not subject to flow control
* dropping
*
* @param m message
*/
void prioritySend(Message m) throws IOException
{
if ( shutdownFlag )
{
return;
}
enqueueMessage(m, true);
}
/**
* Sends a Message through this connection
*
* @param m message
*/
void send(Message m) throws IOException
{
if ( shutdownFlag )
{
return;
}
enqueueMessage(m, false);
}
/**
* Sends a message down the connection and sends any response
* to <code>MessageReceiver</code>
*
*/
void sendAndReceive(Message m,
MessageReceiver receiver) throws IOException
{
if ( shutdownFlag )
{
return;
}
Log.getLog().logDebug("Connection sendAndReceive: " +
m.getType() +
" to to " +
socket.getInetAddress().getHostAddress());
router.routeBack(m, receiver);
prioritySend(m);
}
/**
* Adds a message to the send queue, subject to dropping due to a message
* backlog
*
*/
void enqueueMessage(Message message, boolean priority)
{
if ( droppedCount > ( .5 * outputCount ) )
{
// drop this connection, it is hung or performing at less than half our
// send rate
shutdown();
return;
}
int type = message.getType();
int backlogSize = messageBacklog.size();
if ( !priority )
{
switch (type)
{
case Message.PING:
{
if ( backlogSize > BACKLOG_PING_LEVEL )
{
droppedCount++;
return;
}
break;
}
case Message.PONG:
{
if ( backlogSize > BACKLOG_PONG_LEVEL )
{
droppedCount++;
return;
}
break;
}
case Message.QUERY:
{
if ( backlogSize > BACKLOG_QUERY_LEVEL )
{
droppedCount++;
return;
}
break;
}
case Message.QUERYREPLY:
{
if ( backlogSize > BACKLOG_QUERYREPLY_LEVEL )
{
droppedCount++;
return;
}
break;
}
case Message.PUSH:
{
if ( backlogSize > BACKLOG_PUSH_LEVEL )
{
droppedCount++;
// give up on this connection
shutdown();
return;
}
break;
}
}
}
MessageData messageData = new MessageData(message, priority);
if ( priority )
{
messageBacklog.add(0, messageData);
}
else
{
// add non-priority messages at the end of the list
synchronized ( messageBacklog )
{
// prevent race between size/add
int currentSize = messageBacklog.size();
messageBacklog.add(0 == currentSize ? 0 : currentSize - 1,
messageData);
}
}
synchronized ( asyncSender )
{
asyncSender.notify();
}
}
/**
* Handles a serious error on the connection
*
*/
void handleConnectionError(Exception e)
{
Log.getLog().logInformation("Shutting down connection: " + host);
status = STATUS_FAILED;
if ( null != e)
{
Log.getLog().log(e);
}
shutdown();
}
/**
* Get the connected host
*
* @return host name
*/
public String getConnectedServant()
{
return host;
}
/**
* Get the current status of the connection
*
* @returns status
*/
public int getStatus()
{
return status;
}
/**
* Get the type of connection, incoming or outgoing
*
* @return connection type
*/
public int getType()
{
return type;
}
/**
* Get the message output count
*
* @return output
*/
public int getMessageOutput()
{
return outputCount;
}
/**
* Get the message input count
*
* @return input
*/
public int getMessageInput()
{
return inputCount;
}
/**
* Get the number of messages dropped on this connection
*
* @return dropcount
*/
public int getMessageDropCount()
{
return droppedCount;
}
/**
* Get the lenght of time the connection has lived
*
* @return time in seconds
*/
public int getUpTime()
{
long msLife = System.currentTimeMillis() - createTime;
return (int)(msLife/1000);
}
/**
* Returns the timestamp of the last send
*
* @return timestamp
*/
public long getSendTime()
{
return sendTime;
}
/**
* Message data stored in the message backlog
*
*/
class MessageData
{
private Message message;
private boolean priority;
/**
* Constructs message data
*
* @param message GNUTella message
* @param priority priority messages are not subject to dropping
*/
MessageData(Message message, boolean priority)
{
this.message = message;
this.priority = priority;
}
/**
* Get the message
*
* @return message
*/
Message getMessage()
{
return message;
}
/**
* Check if this is a priority message. Generally, messages originated by
* the JTella servant are considered priority messages
*
* @return priority flag
*/
boolean isPriority()
{
return priority;
}
}
/**
* Provides a mechanism to send a message and handle the problem of
* blocking on write, due to an unresponsive servant on the connection
*
*/
class AsyncSender extends Thread
{
private boolean shutdown = false;
AsyncSender()
{
super("AsyncSender");
}
/**
* Get the message
*
*/
public Message getMessage()
{
int size = messageBacklog.size();
while ( 0 == size && !shutdown )
{
try
{
synchronized ( this )
{
wait();
}
}
catch (InterruptedException ie)
{
}
size = messageBacklog.size();
}
if ( shutdown )
{
return null;
}
return ((MessageData)messageBacklog.remove(0)).getMessage();
}
public void shutdown()
{
shutdown = true;
interrupt();
}
public void run()
{
while (!shutdown)
{
Message message = getMessage();
try
{
Connection.this.sendTime = System.currentTimeMillis();
byte[] messageBytes = message.getByteArray();
outputCount++;
Connection.this.outputStream.write(messageBytes,
0,
messageBytes.length);
Connection.this.outputStream.flush();
// temp
/*
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < messageBytes.length; i++)
{
buffer.append("[" +
Integer.toHexString(messageBytes[i]) +
"]");
}
Log.getLog().logDebug("Sent message: " +
message.getType() +
"\n" +
buffer.toString());
*/
// end temp
}
catch (Exception e)
{
shutdown();
Log.getLog().log(e);
}
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -