⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketpastrynodefactory.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
   * blocks until a response is received, and then closes the socket and returns   * the response.   *   * @param address The address to send to   * @param message The message to send   * @return The response   * @exception IOException DESCRIBE THE EXCEPTION   */  protected Message getResponse(InetSocketAddress address, Message message)     throws IOException {    // create reader and writer    SocketChannelWriter writer;    SocketChannelReader reader;    writer = new SocketChannelWriter(environment, SourceRoute.build(new EpochInetSocketAddress(address,      EpochInetSocketAddress.EPOCH_UNKNOWN)));    reader = new SocketChannelReader(environment, SourceRoute.build(new EpochInetSocketAddress(address,      EpochInetSocketAddress.EPOCH_UNKNOWN)));    // bind to the appropriate port    SocketChannel channel = SocketChannel.open();    channel.configureBlocking(true);    channel.socket().connect(address, 20000);    channel.socket().setSoTimeout(20000);    writer.enqueue(TOTAL_HEADER);    writer.enqueue(message);    writer.write(channel);    SocketBuffer o = null;    while (o == null) {      o = reader.read(channel);    }    if (logger.level <= Logger.FINER) {      logger.log("SPNF.getResponse(): Closing " + channel);    }    channel.socket().shutdownOutput();    channel.socket().close();    channel.close();    if (logger.level <= Logger.FINER) {      logger.log("SPNF.getResponse(): Closed " + channel);    }    return o.deserialize(deserializer);  }  /**   * Gets the Response attribute of the SocketPastryNodeFactory object   *   * @param address DESCRIBE THE PARAMETER   * @param message DESCRIBE THE PARAMETER   * @param c DESCRIBE THE PARAMETER   * @return The Response value   */  protected CancellableTask getResponse(final InetSocketAddress address,                                              final Message message, final Continuation c) {    // create reader and writer    final SocketChannelWriter writer;    final SocketChannelReader reader;    writer = new SocketChannelWriter(environment, SourceRoute.build(new EpochInetSocketAddress(address,      EpochInetSocketAddress.EPOCH_UNKNOWN)));    reader = new SocketChannelReader(environment, SourceRoute.build(new EpochInetSocketAddress(address,      EpochInetSocketAddress.EPOCH_UNKNOWN)));    writer.enqueue(TOTAL_HEADER);    try {      writer.enqueue(message);    } catch (IOException ioe) {      c.receiveException(ioe);      return null;    }    // bind to the appropriate port    try {      final SocketChannel channel = SocketChannel.open();      channel.configureBlocking(false);      final SelectionKey key = environment.getSelectorManager().register(        channel,        new SelectionKeyHandler() {          public void connect(SelectionKey key) {            if (logger.level <= Logger.FINE) {              logger.log("SPNF.getResponse(" + address + "," + message                + ").connect()");            }            try {              if (channel.finishConnect()) {                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);              }              if (logger.level <= Logger.FINE) {                logger.log("(SPNF) Found connectable channel - completed connection");              }              // channel.socket().connect(address, 20000);              // channel.socket().setSoTimeout(20000);            } catch (IOException ioe) {              handleException(ioe);            }          }          public void read(SelectionKey key) {            if (logger.level <= Logger.FINE) {              logger.log("SPNF.getResponse(" + address + "," + message                + ").read()");            }            try {              SocketBuffer o = null;              while (o == null) {                o = reader.read(channel);              }              channel.socket().close();              channel.close();              key.cancel();              c.receiveResult(o.deserialize(deserializer));            } catch (IOException ioe) {              handleException(ioe);            }          }          public void write(SelectionKey key) {            if (logger.level <= Logger.FINE) {              logger.log("SPNF.getResponse(" + address + "," + message                + ").write()");            }            try {              if (writer.write(channel)) {                key.interestOps(SelectionKey.OP_READ);              }            } catch (IOException ioe) {              handleException(ioe);            }          }          public void handleException(Exception e) {            try {              channel.socket().close();              channel.close();              channel.keyFor(environment.getSelectorManager().getSelector())                .cancel();            } catch (IOException ioe) {              if (logger.level <= Logger.WARNING) {                logger.logException("Error while trying requesting "                  + message + " from " + address, e);              }            } finally {              c.receiveException(e);            }          }        }, 0);      if (logger.level <= Logger.FINE) {        logger.log("(SPNF) Initiating socket connection to address " + address);      }      if (channel.connect(address)) {        key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);      } else {        key.interestOps(SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE          | SelectionKey.OP_READ);      }      return        new CancellableTask() {          public void run() {          }          public boolean cancel() {            environment.getSelectorManager().invoke(              new Runnable() {                public void run() {                  try {                    synchronized (key) {                      channel.socket().close();                      channel.close();//                  if (logger.level <= Logger.WARNING) {//                    if (!environment.getSelectorManager().isSelectorThread()) {//                      logger.logException("WARNING: cancelling key:"+key+" on the wrong thread.", new Exception("Stack Trace"));//                    }//                  }                      key.cancel();                    }//                return true;                  } catch (Exception ioe) {                    if (logger.level <= Logger.WARNING) {                      logger.logException("Error cancelling task.", ioe);                    }//                return false;                  }                }              });            return true;          }          public long scheduledExecutionTime() {            return 0;          }        };    } catch (IOException ioe) {      c.receiveException(ioe);      return null;    }  }  /**   * Method which constructs an InetSocketAddres for the local host with the   * specifed port number.   *   * @param portNumber The port number to create the address at.   * @param epoch DESCRIBE THE PARAMETER   * @return An InetSocketAddress at the localhost with port portNumber.   */  private EpochInetSocketAddress getEpochAddress(int portNumber, long epoch) {    EpochInetSocketAddress result = null;    result = new EpochInetSocketAddress(new InetSocketAddress(localAddress,      portNumber), epoch);    return result;  }  /**   * Way to generate a NodeHandle with a maximum timeout to receive the result.   * Helper funciton for using the non-blocking version. However this method   * behaves as a blocking call.   *   * @param address   * @param timeout maximum time in millis to return the result. <= 0 will use   *      the blocking version.   * @return   */  public NodeHandle generateNodeHandle(InetSocketAddress address, int timeout) {    if (timeout <= 0) {      return generateNodeHandle(address);    }    TimerContinuation c = new TimerContinuation();    CancellableTask task = generateNodeHandle(address, c);    if (task == null) {      return null;    }    synchronized (c) {      try {        c.wait(timeout);      } catch (InterruptedException ie) {        return null;      }    }    task.cancel();    if (logger.level <= Logger.FINER) {      logger.log("SPNF.generateNodeHandle() returning " + c.ret        + " after trying to contact " + address);    }    return (NodeHandle) c.ret;  }  /**   * Method which contructs a node handle (using the socket protocol) for the   * node at address NodeHandle.   *   * @param address The address of the remote node.   * @return A NodeHandle cooresponding to that address   */  public NodeHandle generateNodeHandle(InetSocketAddress address) {    // send nodeId request to remote node, wait for response    // allocate enought bytes to read a node handle    if (logger.level <= Logger.FINE) {      logger.log("Socket: Contacting bootstrap node " + address);    }    try {      NodeIdResponseMessage rm = (NodeIdResponseMessage) getResponse(address,        new NodeIdRequestMessage());      return new SocketNodeHandle(new EpochInetSocketAddress(address, rm.getEpoch()), rm.getNodeId());    } catch (IOException e) {      if (logger.level <= Logger.FINE) {        logger.logException("Error connecting to address " + address + ": ", e);      } else {        if (logger.level <= Logger.WARNING) {          logger.log("Error connecting to address " + address + ": " + e);        }      }      return null;    }  }  /**   * DESCRIBE THE METHOD   *   * @param address DESCRIBE THE PARAMETER   * @param c DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   */  public CancellableTask generateNodeHandle(final InetSocketAddress address,                                                  final Continuation c) {    if (logger.level <= Logger.FINE) {      logger.log("Socket: Contacting bootstrap node " + address);    }    return getResponse(address, new NodeIdRequestMessage(),      new Continuation() {        public void receiveResult(Object result) {          NodeIdResponseMessage rm = (NodeIdResponseMessage) result;          c.receiveResult(new SocketNodeHandle(new EpochInetSocketAddress(            address, rm.getEpoch()), rm.getNodeId()));        }        public void receiveException(Exception result) {          if (logger.level <= Logger.WARNING) {            logger.log("Error connecting to address " + address + ": " + result);          }          c.receiveException(result);        }      });  }  /**   * Method which creates a Pastry node from the next port with a randomly   * generated NodeId.   *   * @param bootstrap Node handle to bootstrap from.   * @return A node with a random ID and next port number.   */  public PastryNode newNode(NodeHandle bootstrap) {    // if (bootstrap == null) {    // return newNode(bootstrap, NodeId.buildNodeId());    // }    return newNode(bootstrap, nidFactory.generateNodeId());  }  /**   * Method which creates a Pastry node from the next port with a randomly

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -