📄 prudppackethandlerimpl.java
字号:
SHA1Hasher hasher = new SHA1Hasher();
String user_name = auth.getUserName();
String password = new String(auth.getPassword());
byte[] sha1_password;
if ( user_name.equals( "<internal>")){
sha1_password = Base64.decode(password);
}else{
sha1_password = hasher.calculateHash(password.getBytes());
}
byte[] user_bytes = new byte[8];
Arrays.fill( user_bytes, (byte)0);
for (int i=0;i<user_bytes.length&&i<user_name.length();i++){
user_bytes[i] = (byte)user_name.charAt(i);
}
hasher = new SHA1Hasher();
hasher.update( buffer );
hasher.update( user_bytes );
hasher.update( sha1_password );
byte[] overall_hash = hasher.getDigest();
//System.out.println("PRUDPHandler - auth = " + auth.getUserName() + "/" + new String(auth.getPassword()));
baos.write( user_bytes );
baos.write( overall_hash, 0, 8 );
buffer = baos.toByteArray();
}
DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, destination_address );
PRUDPPacketHandlerRequestImpl request = new PRUDPPacketHandlerRequestImpl( receiver, timeout );
try{
requests_mon.enter();
requests.put( new Integer( request_packet.getTransactionId()), request );
}finally{
requests_mon.exit();
}
try{
// System.out.println( "Outgoing to " + dg_packet.getAddress());
if ( send_delay > 0 && priority != PRUDPPacketHandler.PRIORITY_IMMEDIATE ){
try{
send_queue_mon.enter();
if ( send_queue_data_size > MAX_SEND_QUEUE_DATA_SIZE ){
request.sent();
// synchronous write holding lock to block senders
socket.send( dg_packet );
stats.packetSent( buffer.length );
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: request packet sent to "
+ destination_address + ": "
+ request_packet.getString()));
}
Thread.sleep( send_delay );
}else{
send_queue_data_size += dg_packet.getLength();
send_queues[priority].add( new Object[]{ dg_packet, request });
if ( TRACE_REQUESTS ){
String str = "";
for (int i=0;i<send_queues.length;i++){
str += (i==0?"":",") + send_queues[i].size();
}
System.out.println( "send queue sizes: " + str );
}
send_queue_sem.release();
if ( send_thread == null ){
send_thread =
new AEThread( "PRUDPPacketHandler:sender" )
{
public void
runSupport()
{
int[] consecutive_sends = new int[send_queues.length];
while( true ){
try{
send_queue_sem.reserve();
Object[] data;
int selected_priority = 0;
try{
send_queue_mon.enter();
// invariant: at least one queue must have an entry
for (int i=0;i<send_queues.length;i++){
List queue = send_queues[i];
int queue_size = queue.size();
if ( queue_size > 0 ){
selected_priority = i;
if ( consecutive_sends[i] >= 4 ||
( i < send_queues.length - 1 &&
send_queues[i+1].size() - queue_size > 500 )){
// too many consecutive or too imbalanced, see if there are
// lower priority queues with entries
consecutive_sends[i] = 0;
}else{
consecutive_sends[i]++;
break;
}
}else{
consecutive_sends[i] = 0;
}
}
data = (Object[])send_queues[selected_priority].remove(0);
}finally{
send_queue_mon.exit();
}
DatagramPacket p = (DatagramPacket)data[0];
PRUDPPacketHandlerRequestImpl r = (PRUDPPacketHandlerRequestImpl)data[1];
// mark as sent before sending in case send fails
// and we then rely on timeout to pick this up
send_queue_data_size -= p.getLength();
r.sent();
socket.send( p );
stats.packetSent( p.getLength() );
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: request packet sent to "
+ p.getAddress()));
}
long delay = send_delay;
if ( selected_priority == PRIORITY_HIGH ){
delay = delay/2;
}
Thread.sleep( delay );
}catch( Throwable e ){
// get occasional send fails, not very interesting
Logger.log(
new LogEvent(
LOGID,
LogEvent.LT_WARNING,
"PRUDPPacketHandler: send failed: " + Debug.getNestedExceptionMessage(e)));
}
}
}
};
send_thread.setDaemon( true );
send_thread.start();
}
}
}finally{
send_queue_mon.exit();
}
}else{
request.sent();
socket.send( dg_packet );
// System.out.println( "sent:" + buffer.length );
stats.packetSent( buffer.length );
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID, "PRUDPPacketHandler: "
+ "request packet sent to " + destination_address + ": "
+ request_packet.getString()));
}
}
// if the send is ok then the request will be removed from the queue
// either when a reply comes back or when it gets timed-out
return( request );
}catch( Throwable e ){
// never got sent, remove it immediately
try{
requests_mon.enter();
requests.remove( new Integer( request_packet.getTransactionId()));
}finally{
requests_mon.exit();
}
throw( e );
}
}catch( PRUDPPacketHandlerException e ){
throw( e );
}catch( Throwable e ){
Logger.log(new LogEvent(LOGID,LogEvent.LT_ERROR,
"PRUDPPacketHandler: sendAndReceive to " + destination_address + " failed: " + Debug.getNestedExceptionMessage(e)));
throw( new PRUDPPacketHandlerException( "PRUDPPacketHandler:sendAndReceive failed", e ));
}
}
public void
send(
PRUDPPacket request_packet,
InetSocketAddress destination_address )
throws PRUDPPacketHandlerException
{
if ( socket == null ){
if ( init_error != null ){
throw( new PRUDPPacketHandlerException( "Transport unavailable", init_error ));
}
throw( new PRUDPPacketHandlerException( "Transport unavailable" ));
}
try{
checkTargetAddress( destination_address );
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream( baos );
request_packet.serialise(os);
byte[] buffer = baos.toByteArray();
request_packet.setSerialisedSize( buffer.length );
DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, destination_address );
// System.out.println( "Outgoing to " + dg_packet.getAddress());
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: reply packet sent: "
+ request_packet.getString()));
}
socket.send( dg_packet );
stats.packetSent( buffer.length );
// this is a reply to a request, no time delays considered here
}catch( PRUDPPacketHandlerException e ){
throw( e );
}catch( Throwable e ){
e.printStackTrace();
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR, "PRUDPPacketHandler: send to " + destination_address + " failed: " + Debug.getNestedExceptionMessage(e)));
throw( new PRUDPPacketHandlerException( "PRUDPPacketHandler:send failed", e ));
}
}
protected void
checkTargetAddress(
InetSocketAddress address )
throws PRUDPPacketHandlerException
{
if ( address.getPort() == 0 ){
throw( new PRUDPPacketHandlerException( "Invalid port - 0" ));
}
}
public void
setDelays(
int _send_delay,
int _receive_delay,
int _queued_request_timeout )
{
send_delay = _send_delay;
receive_delay = _receive_delay;
// trim a bit off this limit to include processing time
queued_request_timeout = _queued_request_timeout-5000;
if ( queued_request_timeout < 5000 ){
queued_request_timeout = 5000;
}
}
public long
getSendQueueLength()
{
int res = 0;
for (int i=0;i<send_queues.length;i++){
res += send_queues[i].size();
}
return(res);
}
public long
getReceiveQueueLength()
{
return( recv_queue.size());
}
public void
primordialSend(
byte[] buffer,
InetSocketAddress target )
throws PRUDPPacketHandlerException
{
try{
checkTargetAddress( target );
DatagramPacket dg_packet = new DatagramPacket(buffer, buffer.length, target );
// System.out.println( "Outgoing to " + dg_packet.getAddress());
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: reply packet sent: " + buffer.length + " to " + target ));
}
socket.send( dg_packet );
stats.primordialPacketSent( buffer.length );
}catch( Throwable e ){
throw( new PRUDPPacketHandlerException( e.getMessage()));
}
}
public PRUDPPacketHandlerStats
getStats()
{
return( stats );
}
protected void
destroy()
{
destroyed = true;
destroy_sem.reserve();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -