📄 btpeer.java
字号:
/*
* @(#)BTPeer.java ver 1.2 6/20/2005
*
* Copyright 2005 Weishuai Yang (wyang@cs.binghamton.edu).
* All rights reserved.
*
*/
package gps.protocol.BT;
import java.awt.Color;
import java.awt.Graphics;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import gps.event.SimEvent;
import gps.event.SimEventHandler;
import gps.protocol.Document;
import gps.protocol.Peer;
import gps.protocol.BT.algorithm.BTAlgorithmPeerSelectionAtPeer;
import gps.protocol.BT.param.BTPeerMessage;
import gps.protocol.BT.param.BTTorrent;
import gps.util.LogFormatter;
/**
* BT Peer
*
* @author Weishuai Yang
* @version 1.2, 6/20/2005
*/
public class BTPeer extends Peer {
/**
* maximum allowed session number on this peer
*/
public static final int MAX_ALLOWED_SESSION_NUM=10;
/**
* maximum allowed connection number on this peer
*/
public static final int MAX_ALLOWED_CONNECTION_NUM=80;
/**
* maximum out bandwidth, not used now
*/
public static final double MAX_OUT_BANDWIDTH_PER_NODE=300000;
/**
* current sessions indexed by document hash key
*/
private LinkedHashMap mSessions=null;
/**
* current next connections to be unchoked
*/
private LinkedHashMap mUnchokeNext=null;
/**
* peer selection algorithm
*/
public BTAlgorithmPeerSelectionAtPeer mPeerSelection=new BTAlgorithmPeerSelectionAtPeer(getID());
/**
* constructs a new BTPeer with id provided
* @param i BTPeer id
*/
public BTPeer(int i) {
super(i);
mSessions=new LinkedHashMap();
}
/**
* reset internal values
*/
public void reset(){
super.reset();
if(mSessions!=null)
mSessions.clear();
mUnchokeNext=null;
mPeerSelection=new BTAlgorithmPeerSelectionAtPeer(getID());
}
/**
* do some BTPeer specific drawing on the graph
* @param g Graphics object
* @param x x position
* @param y y position
*/
public void agentDraw(Graphics g, int x, int y){
g.setColor( Color.green );
g.fillOval( x-5, y-5, 10, 10 );
g.drawString("Peer("+mAgentID+")",x-16,y-6 );
if(!mDocDBbyKey.isEmpty()){
StringBuffer sb=new StringBuffer();
ArrayList keylist=new ArrayList(mDocDBbyKey.keySet());
for(int i=0;i<keylist.size();i++){
BTDocument btd=(BTDocument)mDocDBbyKey.get(keylist.get(i));
sb.append((String)keylist.get(i));
if(!btd.isWhole()){
sb.append("[");
sb.append(LogFormatter.sprintf("%.1f%",btd.getFinishedPercent()*100));
sb.append("]");
}
sb.append(" ");
}
g.setColor( Color.black );
g.drawString(sb.toString(), x-12, y+12);
}
}
/**
* gets all sessions on this peer
* @return hash map of sessions
*/
public LinkedHashMap getSessions(){
return mSessions;
}
/**
* gets session for a specific document
* @param key document hash key
* @return session object
*/
public BTSession getSession(String key){
return (BTSession)mSessions.get(key);
}
/**
* removes session for a specific document
* @param key document hash key
* @return the removed session object
*/
public BTSession removeSession(String key){
return (BTSession)mSessions.remove(key);
}
/**
* retrieves a document by key
*
* @param key document hash key
* @return BTDocument object
*/
public BTDocument getBTDocument(String key){
return (BTDocument)mDocDBbyKey.get(key);
}
/**
* handles peer level events
*
* @param e the event to be handled
* @return true if already handled
*
*/
public boolean handle(SimEvent e){
if(super.handle(e))
return true;
switch(e.getType()){
//case BTEvent.P_TRACKER_RESPONSE: handleTrackerResponse((BTEvent)e);
/**
* in the following three events handler, handlePublish, handleDownload
* and handleRandomDownload, nodes are identified by their ID, since
* those are events usually from conviguration file. All other events identified
* nodes by the reference to the instance.
*/
case BTEvent.USER_PUBLISH:
handlePublish((BTEvent)e);
return true;
case BTEvent.USER_DOWNLOAD:
handleDownload((BTEvent)e);
return true;
case BTEvent.USER_RANDOM_DOWNLOAD:
handleRandomDownload((BTEvent)e);
return true;
//BTPeer handles peer message, but only limited to handshaking
case BTEvent.PEER_MESSAGE:
handlePeerMessage((BTEvent)e);
return true;
default:
mDebugLog.warning("received event "+e+" unprocessed");
}
return false;
}
/**
* handles events for publishing a specific
* ducument to a specified tracker at a specific time.
*
* @param e the event object
*/
public void handlePublish(BTEvent e){
int trackerID=((Integer)e.getParam()).intValue();
String docHashKey=(String)e.getAddParam();
publish(docHashKey, (BTTracker)BT.getInstance().getAgent(trackerID));
}
/**
* handles events for downloading a specific
* ducument from a specified tracker at a specific time.
*
* @param e the event object
*/
public void handleDownload(BTEvent e){
int trackerID=((Integer)e.getParam()).intValue();
String docHashKey=(String)e.getAddParam();
double t=BT.getInstance().getDelayAgent(getID(),trackerID);
BTTracker tracker=(BTTracker)BT.getInstance().getAgent(trackerID);
//download a torrent metainfo file from web pages
BTTorrent btt=tracker.getTorrent(docHashKey);
BTSession session=null;
if(mSessions.containsKey(btt.mDocHashKey)){
session=(BTSession)mSessions.get(btt.mDocHashKey);
session.setTorrent(btt);
}
else {
session=new BTSession(btt, this);
mSessions.put(btt.mDocHashKey,session);
}
session.queryTracker();
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" -- Download -- "+btt.mDocHashKey+" -- "+tracker+" -- Starts");
}
/**
* handles events for downloading a random
* ducument from a specified tracker at a specific time.
*
* @param e the event object
*/
public void handleRandomDownload(BTEvent e){
int trackerID=((Integer)e.getParam()).intValue();
double t=BT.getInstance().getDelayAgent(getID(),trackerID);
BTTracker tracker=(BTTracker)BT.getInstance().getAgent(trackerID);
//download a random torrent metainfo file from web pages
BTTorrent btt=tracker.getRandomTorrent(mDocDBbyKey);
if(btt==null) {
mDebugLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+tracker+" can't find a random document for "+this+" to download");
return;
}
BTSession session=null;
if(mSessions.containsKey(btt.mDocHashKey)){
session=(BTSession)mSessions.get(btt.mDocHashKey);
session.setTorrent(btt);
}
else {
session=new BTSession(btt, this);
mSessions.put(btt.mDocHashKey,session);
}
session.queryTracker();
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" -- Random-Download -- "+btt.mDocHashKey+" -- "+tracker+" -- Starts");
}
/**
* handles HAND_SHAKING, all other peer messages are not supposed to be sent to BTPeer.
* They are handled by BTSession and BTConnection
*
* @param e is the event object
*/
public void handlePeerMessage(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
/*
mDebugLog.info(mScheduler.getCurrent()+" "+this+"receiving peer message from " +
bts.getAgent()+" for "+btpm.mDocHashKey+" Type: "+
btpm.messageToString());
*/
//peer object only processes hand shaking, other type peer messages are processed by session object
if(btpm.mType!=BTPeerMessage.HAND_SHAKING)
return;
handleHandShaking(e);
}
/**
* handleHandShaking makes sure each side a session. The initiator should
* already has a session, but the other side may or may not has
* one yet. if the receiver doesn't have one, set up one for it,
* and response a bitfield to the initiator's session.
* for simplicity, handshake already includes bitfield in the message,
* and the receiver of handshake replys a bitfield message, and
* set up the connection. The initiator also set up connection after
* receive the bitfield.
*
* @param e is the event object
*/
public void handleHandShaking(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
mDebugLog.info(mScheduler.getCurrent()+": "+toString()+"received handshaking from "+bts.getAgent());
BTDocument btd=getBTDocument(btpm.mDocHashKey);
if(btd==null){
//if this node doesn't have this document, do nothing
return;
}
else{
BTSession session=null;
//if this peer doesn't have a session downloading or uploading that document
if(!mSessions.containsKey(btpm.mDocHashKey)){
//if this peer already has too many session, and doesn't want others download from it
if(mSessions.size()>=MAX_ALLOWED_SESSION_NUM)
return;
//no current session for this file, no matter a complete or partial file, new a session, and serve as a seed.
session=new BTSession(btpm.mDocHashKey, this);
mSessions.put(btpm.mDocHashKey, session);
}
else
session=(BTSession)mSessions.get(btpm.mDocHashKey);
boolean interested=false;
//retrieve bitfiled informaiton into session knowledge
//XXX
for(int i=0;i<btpm.mBitfield.length;i++){
if(btpm.mBitfield[i]){
if(btd.mPieces[i]==false){
//that node has this piece, but this node doesn't have
interested=true;
}
session.addNodeHasPiece(bts.getAgent(), i);
}
}
//set up a connection
/**
* note: for simplicity, each connection only serve one document,
* so it's possible that there are more than one connections between 2 nodes.
*/
//check whether already has a connection to that node for this document
BTSocket connection=null;
if(!session.getConnections().containsKey(bts.getAgent())){
//if this peer already has too many session, and doesn't want others download from it
if(session.getConnections().size()>=MAX_ALLOWED_CONNECTION_NUM)
return;
mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" creating a new connection to "+bts.getAgent());
//no current session for this file, no matter a complete or partial file, new a session, and serve as a seed.
connection=new BTSocket(session);
session.getConnections().put(bts.getAgent(), connection);
//set up the timeout event
BTEvent bte=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT, BTEvent.CONNECTION_TIMEOUT, session, bts.getAgent());
mScheduler.enqueue(bte);
connection.mTimeout=bte;
//mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" connection timeout set up to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
}
else{
connection=(BTSocket)session.getConnections().get(bts.getAgent());
mScheduler.cancel(connection.mTimeout);
connection.mTimeout.setTimeStamp(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT);
mScheduler.enqueue(connection.mTimeout);
//mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" connection timeout updated0 to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
if(connection.mKeepAlive!=null)
mScheduler.cancel(connection.mKeepAlive);
}
//set up the keep alive event, to be processed by the connection, when triggered, send a keep alive message
BTEvent bte2=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_KEEP_ALIVE, BTEvent.CONNECTION_KEEP_ALIVE, connection, connection);
mScheduler.enqueue(bte2);
connection.mKeepAlive=bte2;
connection.mBitField=btpm.mBitfield;
//response a bitfield message
BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.BITFIELD);
btpm2.mDocHashKey=btpm.mDocHashKey;
btpm2.mBitfield=(boolean[])session.getDocument().getPieces().clone();
btpm2.mInterested=interested;
//this is only used in the bitfiled message to set up connections to each other
btpm2.mConnection=connection;
connection.mAmChoking=true;
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(this,bts.getAgent());
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, (SimEventHandler)bts, session);
bte.setAddParam(btpm2);
mScheduler.enqueue(bte);
}
}
/**
* publishing the document to tracker
*
* @param docHashkey the hashkey of the document
* @param tracker the tracker instance
*/
public void publish(String docHashkey, BTTracker tracker){
//publish actually is web posting to web server, which is out of the scope of BT
tracker.addTorrent(new BTTorrent((BTDocument)Document.mDocumentListbyKey.get(docHashkey), tracker));
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" -- Publish -- "+docHashkey+" -- "+tracker);
}
/**
* gets string description
* @return string description
*/
public String toString(){
return "BTPeer("+mAgentID+")";
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -