⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socketappsocket.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
   */  public synchronized void modifyKey(SelectionKey key) {    int flag = 0;    if (reader != null) {      flag |= SelectionKey.OP_READ;    }    if (writer != null) {      flag |= SelectionKey.OP_WRITE;    }    if (toWrite != null) {      flag |= SelectionKey.OP_WRITE;    }    if (connectResult == CONNECTION_UNKNOWN) {      flag |= SelectionKey.OP_READ;    }    //System.out.println(this+"modifyKey()"+flag);    key.interestOps(flag);  }  /**   * Specified by the SelectionKeyHandler interface - calling this tells this   * socket manager that the connection has completed and we can now read/write.   *   * @param key The key which is connectable.   */  public void connect(SelectionKey key) {    try {      // deregister interest in connecting to this socket      if (channel.finishConnect()) {        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);      }      if (manager.logger.level <= Logger.FINE) {        manager.logger.log("(SM) Found connectable channel - completed connection");      }    } catch (Exception e) {      if (manager.logger.level <= Logger.FINE) {        manager.logger.logException(          "(SM) Unable to connect to " + this, e);      }      exceptionAndClose(e);    }  }  /**   * Reads from the socket attached to this connector.   *   * @param key The selection key for this manager   */  public void read(SelectionKey key) {    //System.out.println(this+"Reading");    if (connectResult == CONNECTION_UNKNOWN) {      try {        clearTimer(receiver);        manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key);        ByteBuffer answer = ByteBuffer.allocate(1);        ((SocketChannel) key.channel()).read(answer);        answer.clear();        connectResult = answer.get();        //System.out.println(this+"Read "+connectResult);        switch (connectResult) {          case CONNECTION_OK:            receiver.receiveSocket(this);            // on connector side            return;          case CONNECTION_NO_APP:            exceptionAndClose(new AppNotRegisteredException());            return;          case CONNECTION_NO_ACCEPTOR:            exceptionAndClose(new NoReceiverAvailableException());            return;          default:            exceptionAndClose(new AppSocketException("Unknown error " + connectResult));            return;        }      } catch (IOException ioe) {        exceptionAndClose(ioe);      }      return;    }    AppSocketReceiver temp = reader;    clearTimer(reader);    reader = null;    temp.receiveSelectResult(this, true, false);    manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key);  }  /**   * DESCRIBE THE METHOD   *   * @param millis DESCRIBE THE PARAMETER   * @param theReceiver DESCRIBE THE PARAMETER   */  private void startTimer(int millis, final AppSocketReceiver theReceiver) {    if (millis <= 0) {      return;    }    clearTimer(theReceiver);    TimerTask timer =      new TimerTask() {        public void run() {          timers.remove(theReceiver);          theReceiver.receiveException(SocketAppSocket.this, new TimeoutException());          close();        }      };//    System.out.println("startTimer()"+timer);//    new Exception("stack trace").printStackTrace(System.out);    timers.put(theReceiver, timer);    manager.pastryNode.getEnvironment().getSelectorManager().getTimer().schedule(timer, millis);  }  /**   * DESCRIBE THE METHOD   *   * @param theReceiver DESCRIBE THE PARAMETER   */  private void clearTimer(AppSocketReceiver theReceiver) {    TimerTask timer = (TimerTask) timers.remove(theReceiver);//    System.out.println("Clearing "+timer);    if (timer == null) {      return;    }    timer.cancel();    timer = null;  }  /**   * DESCRIBE THE METHOD   *   * @param e DESCRIBE THE PARAMETER   */  private void exceptionAndClose(Exception e) {    clearTimer(receiver);    receiver.receiveException(SocketAppSocket.this, e);    close();  }  /**   * Writes to the socket attached to this socket manager.   *   * @param key The selection key for this manager   */  public synchronized void write(SelectionKey key) {    if (toWrite != null) {      try {        // System.out.println(this+"SocketAppSocket.wroteHeader."+toWrite.remaining());        ((SocketChannel) key.channel()).write(toWrite);      } catch (IOException ioe) {        exceptionAndClose(ioe);      }      if (!toWrite.hasRemaining()) {        toWrite = null;        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);//        receiver.receiveSocket(this); moved to read      }      return;    }    AppSocketReceiver temp = writer;    clearTimer(writer);    writer = null;    temp.receiveSelectResult(this, false, true);    manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key);  }  /**   * Accepts a new connection on the given key   *   * @param key DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void acceptConnection(SelectionKey key) throws IOException {    //System.out.println("accept connection");    connectResult = CONNECTION_OK;    this.channel = (SocketChannel) key.channel();    this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(key.channel(), this, 0);    // lookup acceptor    toWrite = ByteBuffer.allocate(1);    try {      manager.pastryNode.acceptAppSocket(this, appId);      toWrite.put(CONNECTION_OK);    } catch (AppNotRegisteredException anre) {      if (manager.logger.level <= Logger.WARNING) {        manager.logger.logException("Sending error to connecter " + channel + " ", anre);      }      toWrite.put(CONNECTION_NO_APP);    } catch (NoReceiverAvailableException nrae) {      if (manager.logger.level <= Logger.WARNING) {        manager.logger.logException("Sending error to connecter " + channel + " ", nrae);      }      toWrite.put(CONNECTION_NO_ACCEPTOR);    } catch (AppSocketException ase) {      if (manager.logger.level <= Logger.WARNING) {        manager.logger.logException("Sending error to connecter " + channel + " ", ase);      }      toWrite.put(CONNECTION_UNKNOWN_ERROR);    }    toWrite.clear();    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);    if (manager.logger.level <= Logger.FINE) {      manager.logger.log(        "(SM) Accepted app connection from " +        channel.socket().getRemoteSocketAddress());    }  }  /**   * Creates the outgoing socket to the remote handle   *   * @param path DESCRIBE THE PARAMETER   * @exception IOException DESCRIBE THE EXCEPTION   */  protected void createConnection(final SourceRoute path) throws IOException {    this.channel = SocketChannel.open();    this.channel.socket().setSendBufferSize(manager.SOCKET_BUFFER_SIZE);    this.channel.socket().setReceiveBufferSize(manager.SOCKET_BUFFER_SIZE);    this.channel.configureBlocking(false);    this.key = manager.pastryNode.getEnvironment().getSelectorManager().register(channel, this, 0);    if (manager.logger.level <= Logger.FINE) {      manager.logger.log("(SM) Initiating socket connection to path " + path);    }    manager.pastryNode.broadcastChannelOpened(path.getFirstHop().getAddress(), NetworkListener.REASON_APP_SOCKET_NORMAL);    if (this.channel.connect(path.getFirstHop().getAddress())) {      this.key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);    } else {      this.key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ | SelectionKey.OP_CONNECT);    }  }  /**   * DESCRIBE THE METHOD   *   * @param dsts DESCRIBE THE PARAMETER   * @param offset DESCRIBE THE PARAMETER   * @param length DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   * @exception IOException DESCRIBE THE EXCEPTION   */  public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {    //System.out.println(this+"read");    return channel.read(dsts, offset, length);  }  /**   * DESCRIBE THE METHOD   *   * @param srcs DESCRIBE THE PARAMETER   * @param offset DESCRIBE THE PARAMETER   * @param length DESCRIBE THE PARAMETER   * @return DESCRIBE THE RETURN VALUE   * @exception IOException DESCRIBE THE EXCEPTION   */  public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {    //System.out.println(this+"write("+srcs.length+","+offset+","+length+")");    return channel.write(srcs, offset, length);  }  /**   * DESCRIBE THE METHOD   *   * @param wantToRead DESCRIBE THE PARAMETER   * @param wantToWrite DESCRIBE THE PARAMETER   * @param timeout DESCRIBE THE PARAMETER   * @param receiver DESCRIBE THE PARAMETER   */  public void register(boolean wantToRead, boolean wantToWrite, int timeout, AppSocketReceiver receiver) {    if (wantToRead) {      reader = receiver;    }    if (wantToWrite) {      writer = receiver;    }    startTimer(timeout, receiver);    manager.pastryNode.getEnvironment().getSelectorManager().modifyKey(key);  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -