📄 packetreceiver.java
字号:
package mddb.client;
import java.io.*;
import java.net.*;
import java.util.*;
import mddb.packets.*;
import java.awt.Point;
import mddb.client.components.*;
/*包接收器--用于监听从网络传递过来的包/桢并加以处理
含有MulticastSocket,分别用于接收网络数据报
*/
public class PacketReceiver
extends Thread {
private MulticastSocket receiver;
private ClientNode srcNode; //指向本身Node对象的引用
public PacketReceiver( ClientNode node ) throws SocketException {
srcNode = node;
try {
receiver = new MulticastSocket( 9999 );
InetAddress group = InetAddress.getByName( "231.0.0.1" );
receiver.joinGroup( group );
}
catch( SocketException se ) {
throw new SocketException( "网络包接收器初始化失败!" );
}
catch( IOException ioe ) {
ioe.printStackTrace();
}
}
//负责监听网络,不断地接收网络总传来的frame,组织成数据包,传给本节点的包接收处理函数
//挺像Windows消息循环哪:〉
public void run() {
try {
//储存结果集的缓存--设置为1k
byte[] buffer = new byte[1024];
DatagramPacket dgp = new DatagramPacket( buffer, buffer.length );
while( true ) {
receiver.receive( dgp );
byte[] resultArray = new byte[dgp.getLength()];
System.arraycopy( dgp.getData(), 0, resultArray, 0,
dgp.getLength() );
StringTokenizer st = new StringTokenizer( new String(
resultArray ), "@" );
//得到桢的类型:分组包or路由包or查询包
int frameType = Integer.parseInt( st.nextToken() );
//对收到的包进行处理
OnReceivePacket( frameType,
extractFrameToPacket( st ),
dgp.getLength() );
//每隔10ms秒接收一次
Thread.currentThread().sleep( 10 );
}
}
catch( InterruptedException ipe ) {
receiver.close();
return;
}
catch( IOException ioe ) {
ioe.printStackTrace();
receiver.close();
return;
}
}
//包处理函数--分析包,将包发给不同的层进行处理,或转发不属于本节点的包
private void OnReceivePacket( int frameType, MddbPacket packet,
int dataLength ) {
//对于自己给自己发的包不予处理
if( packet.getSrcId() == srcNode.ID ) {
return;
}
//对于超出范围的包不予处理
if( packet.getPathLength() > srcNode.validRange ) {
return;
}
//对于广播包要查看源节点的位置并算出距离
if( packet.getPathLength() == ParentNode.ANY_DISTANCE ) {
if( srcNode.getDistance( srcNode.getID(), packet.getSrcId() ) >
srcNode.validRange ) {
return;
}
}
//对于一跳之内的包算出其确切位置后放回
if( packet.getPathLength() == ParentNode.ONE_HOP_DISTANCE ) {
packet.setPathLength( srcNode.getDistance( srcNode.getID(),
packet.getSrcId() ) );
}
if( frameType == MddbPacket.FRAME_GROUP_PKG ) {
MddbPacket mp = OnReceiveNetPacket( packet, dataLength );
if( mp != null ) {
try {
srcNode.pkgSender.sendNetPacket( mp,
MddbPacket.FRAME_GROUP_PKG );
}
catch( IOException ioe ) {
System.err.println( "包发送失败--PacketReceiver中" );
ioe.printStackTrace();
return;
}
}
}
}
//网络包接收处理函数,接收并处理网络层的数据包------------------------------------------
//注意这里的st已经调用过一回nextToken了
private MddbPacket OnReceiveNetPacket( MddbPacket packet, int dataLength ) {
int type = packet.getType();
switch( type ) {
case MddbPacket.NET_HELLO:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的HELLO包" );
return helloProc( packet, dataLength );
case MddbPacket.NET_RHELLO:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的RHELLO包" );
rehelloProc( packet );
break;
case MddbPacket.NET_RM:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的RM包" );
return rmProc( packet, dataLength );
case MddbPacket.NET_RRM:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的RRM包" );
rrmProc( packet );
break;
case MddbPacket.NET_DELAY:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的DELAY包" );
delayProc( packet, dataLength );
break;
case MddbPacket.NET_MI:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的MI包" );
miProc( packet );
break;
case MddbPacket.NET_NG:
System.out.println( "我是节点" + srcNode.ID + " 我收到来自" +
packet.getSrcId() + "的NG包" );
ngProc( packet );
break;
}
return null;
}
//全体公用函数==============================================================
//对路由求逆
private ArrayList contraryRoutine( MddbPacket packet ) {
ArrayList rList = new ArrayList();
for( int i = packet.getRoutineList().size() - 1; i >= 0; i-- ) {
rList.add( packet.getRoutineList().get( i ) );
}
return rList;
}
//NG专用====================================================================
private synchronized void ngProc( MddbPacket packet ) {
synchronized( srcNode.groupAdjacencyList ) {
//将<NG.SourceId,NG.AdjacencyId,NG.AdjacencyMasterId>加入邻接表
StringTokenizer st = new StringTokenizer( packet.getData(), "@" );
long adjacencyId = Long.parseLong( st.nextToken() );
long adjacencyMasterId = Long.parseLong( st.nextToken() );
srcNode.groupAdjacencyList.add( packet.getSrcId(),
adjacencyId, adjacencyMasterId );
}
}
//MI专用====================================================================
private synchronized void miProc( MddbPacket packet ) {
//用MI包中的信息更新NeighbourList
NeighbourList miNeighbour = nebrListInMIPack( packet ); //得到发送方的邻居表
for( int i = 0; i < miNeighbour.getSize(); i++ ) {
Object[] temp = miNeighbour.get( i );
srcNode.nebrList.update( ( Long )temp[0], ( Long )temp[1],
( Point )temp[2] );
}
StringTokenizer st = new StringTokenizer( packet.getData(), "@" );
long senderMasterId = Long.parseLong( st.nextToken() );
srcNode.nebrList.add( packet.getSrcId(), senderMasterId,
packet.getSrcAddr() );
//----------------------------------邻居表更新完毕
if( srcNode.isMaster() ) {
if( srcNode.mbrList.contains( new Long( packet.getSrcId() ) ) &&
masterOfMISender( packet ) != srcNode.ID ) {
srcNode.mbrList.remove( packet.getSrcId() );
int removePos;
if( ( removePos = srcNode.groupAdjacencyList
.containsGatewayId( packet.getSrcId(),
MddbPacket.FRAME_GROUP_PKG ) ) >= 0 ) {
srcNode.groupAdjacencyList.remove( removePos );
}
}
else if( masterOfMISender( packet ) == srcNode.ID ) {
//用MI包中的邻接信息更新邻接表======================
srcNode.groupAdjacencyList.updateWithMIPacket( miNeighbour,
packet.getSrcId(), srcNode.ID );
}
else { //将<ID,MI.SourceId,MI.MasterId>加入邻居表
srcNode.groupAdjacencyList.add( srcNode.ID,
packet.getSrcId(),
masterOfMISender( packet ) );
}
}
else {
if( masterOfMISender( packet ) != srcNode.masterId ) {
Point masterPos = srcNode.getLocation( srcNode.ID );
ArrayList path = new ArrayList();
path.add( srcNode.position );
path.add( masterPos );
//TODO:向自己的Master发送NG包;
MddbPacket newPacket = new MddbPacket( new Point( srcNode.
position ),
srcNode.ID,
masterPos,
ParentNode.ONE_HOP_DISTANCE,
path,
MddbPacket.NET_NG,
packet.getTimeStamp() );
newPacket.setData( String.valueOf( packet.getSrcId() ) + "@" );
newPacket.setData( String.valueOf( masterOfMISender( packet ) +
"@" ) );
try {
srcNode.pkgSender.sendNetPacket( newPacket,
MddbPacket.FRAME_GROUP_PKG );
}
catch( IOException ioe ) {
System.err.println( "包发送失败--PacketReceiver中" );
ioe.printStackTrace();
return;
}
}
}
}
//MI包发送者的Master的ID
private long masterOfMISender( MddbPacket packet ) {
StringTokenizer st = new StringTokenizer( packet.getData(), "@" );
return Long.parseLong( st.nextToken() );
}
//返回NeighbourList
private NeighbourList nebrListInMIPack( MddbPacket packet ) {
NeighbourList nebrTemp = new NeighbourList();
StringTokenizer st = new StringTokenizer( packet.getData(), "@" );
st.nextToken(); //跳过MasterId
while( st.hasMoreTokens() ) {
StringTokenizer cSt = new StringTokenizer( st.nextToken(), ":" );
nebrTemp.add( Long.parseLong( cSt.nextToken() ),
Long.parseLong( cSt.nextToken() ),
new Point( Integer.parseInt( cSt.nextToken() ),
Integer.parseInt( cSt.nextToken() ) ) );
}
return nebrTemp;
}
//DELAY专用================================================================
private void delayProc( MddbPacket packet, int dataLength ) {
synchronized( srcNode ) {
if( packet.getTimeStamp() != srcNode.timeStamp ||
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -