⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nioselectsource.java

📁 The Staged Event-Driven Architecture (SEDA) is a new design for building scalable Internet services.
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    for (int i = 0; i < ret.length; i++) {      ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);    }    return ret;  }  /**    * Dequeues at most <tt>num</tt> elements which are ready from the    * SelectSource. Returns null if no entries available.   */  public QueueElementIF[] dequeue(int num) {    if (selector.keys().size() == 0) return null;    if ((ready_size == 0) || (ready_offset == ready_size)) {      doPoll(0);    }     if (ready_size == 0) return null;    int numtoret = Math.min(ready_size - ready_offset, num);    NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret];    for (int i = 0; i < numtoret; i++) {      ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);    }    return ret;  }  /**   * Dequeue the next element from the SelectSource. Blocks up to    * timeout_millis milliseconds; returns null if no entries available   * after that time. A timeout of -1 blocks forever.   */  public QueueElementIF blocking_dequeue(int timeout_millis) {    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called");    synchronized (blocker) {      if (selector.keys().size() == 0) {	if (DEBUG) System.err.println("No keys in selector");	if (timeout_millis == 0) return null;        // Wait for something to be registered	if (timeout_millis == -1) {	  try {	    blocker.wait();	  } catch (InterruptedException ie) {	  }	} else {	  try {	    blocker.wait(timeout_millis);	  } catch (InterruptedException ie) {	  }	}      }    }    if ((ready_size == 0) || (ready_offset == ready_size)) {      doPoll(timeout_millis);    }     if (ready_size == 0) {        if (DEBUG) System.err.println("still no ready");        return null;    }    return new NIOSelectorQueueElement(ready[ready_offset++]);  }  /**   * Dequeue a set of elements from the SelectSource. Blocks up to    * timeout_millis milliseconds; returns null if no entries available   * after that time. A timeout of -1 blocks forever.   */  public QueueElementIF[] blocking_dequeue_all(int timeout_millis) {    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue_all called");    /* have to do this to retain same semantics as before       nio expects 0 for indefinite.  there is no way to say       don't block at all, so hopefully 1ms isn't noticable to people */    synchronized (blocker) {      if (selector.keys().size() == 0) {	if (DEBUG) System.err.println("!!!!no keys");	if (timeout_millis == 0) return null;	// Wait for something to be registered	if (timeout_millis == -1) {	  try {	    blocker.wait();	  } catch (InterruptedException ie) {	  }	} else {	  try {	    blocker.wait(timeout_millis);	  } catch (InterruptedException ie) {	  }	}      }    }    if ((ready_size == 0) || (ready_offset == ready_size)) {      doPoll(timeout_millis);    }     if (DEBUG) System.err.println("!!!!ready_size=" + ready_size);    if (ready_size == 0) return null;    if (DEBUG) System.err.println("!!!!ready_size-ready_offset=" + (ready_size - ready_offset));    NIOSelectorQueueElement ret[] =      new NIOSelectorQueueElement[ready_size-ready_offset];    for (int i = 0; i < ret.length; i++) {      if (DEBUG) System.err.println("ret["+i+"] = " + ready[ready_offset]);      ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);    }    return ret;  }  /**   * Dequeue a set of elements from the SelectSource. Blocks up to    * timeout_millis milliseconds; returns null if no entries available   * after that time. A timeout of -1 blocks forever.   */  public QueueElementIF[] blocking_dequeue(int timeout_millis, int num) {    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called");    synchronized (blocker) {      if (selector.keys().size() == 0) {      	if (timeout_millis == 0) return null;       	// Wait for something to be registered	if (timeout_millis == -1) {	  try {	    blocker.wait();	  } catch (InterruptedException ie) {	  }	} else {	  try {	    blocker.wait(timeout_millis);	  } catch (InterruptedException ie) {	  }	}      }    }    if ((ready_size == 0) || (ready_offset == ready_size)) {      doPoll(timeout_millis);    }     if (ready_size == 0) return null;    int numtoret = Math.min(ready_size - ready_offset, num);    NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret];    for (int i = 0; i < numtoret; i++) {      ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]);    }    return ret;  }  // Actually performs the poll and sets ready[], ready_off, ready_size  //  // XXX MDW: There is a race condition here. If multiple threads  // call doPoll (e.g., through dequeue()), then the ready set can   // get corrupted. The fix is to make this method synchronized, but  // this would cause a blocking dequeue() to stall all other (possibly  // nonblocking) dequeues until the timeout of the longest blocking  // dequeue. I don't see an easy way around this problem since it's  // the selector.selectedKeys() set that changes with each call to  // selector.select() or selector.selectNow(). The answer is: This class  // is not thread-safe!  private void doPoll(int timeout) {    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): Doing poll, timeout "+timeout);    int c = 0;    try {        // to correct for changed semantics in nio from nbio.        // use selectNow to not block, and select(0) for indefinite block        if (timeout == 0) {            c = selector.selectNow();        } else {            if (timeout == -1) timeout = 0;            c = selector.select(timeout);        }    } catch (IOException e) {      // Essentially ignore the exception (since NBIO SelectSet.select()      // doesn't throw any exceptions)      if (DEBUG) System.err.println("NIOSelectSource: Error doing select: " + e);    }    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): poll returned "+c);    Set skeys = selector.selectedKeys();    if (skeys.size() > 0) {      SelectionKey ret[] = new SelectionKey[skeys.size()];      Iterator key_iter = skeys.iterator();      int j = 0;      while (key_iter.hasNext()) {	ret[j] = (SelectionKey)key_iter.next();	key_iter.remove();	//selector.selectedKeys().remove(ret[j]);	j++;      }      if (ret.length != 0) {	// XXX We can't get ret == null if doPoll() is synchronized with 	// deregister() - but I'm not sure I want to do that	ready_offset = 0; ready_size = ret.length;	balance(ret);	return;      }    }    // Didn't get anything    ready = null; ready_offset = ready_size = 0;  }  // Balances selarr[] by shuffling the entries - sets ready[]  private void balance(SelectionKey selarr[]) {    if (DEBUG) System.err.println("NIOSelectSource ("+name+"): balance called, selarr size="+selarr.length);    for (int i = 0; i < selarr.length; i++)        if (DEBUG) System.err.println("!!!!selar["+i+"] = " + selarr[i]);    if ((!do_balance) || (selarr.length < 2)) {      ready = selarr;    } else {      SelectionKey a;      ready = new SelectionKey[selarr.length];      for (int i = 0; i < ready.length; i++) {      	if (balancer_seq_off == BALANCER_SEQUENCE_SIZE) {     	  balancer_seq_off = 0;      	}       	int n = balancer_seq[balancer_seq_off++] % selarr.length;	int c = 0;	while (selarr[n] == null) {	  n++; c++;	  if (n == selarr.length) n = 0;	  if (c == selarr.length) {	    System.err.println("WARNING: NIOSelectSource.balance(): All items in selarr are null (n="+n+", c="+c+", len="+selarr.length);	    for (int k = 0; k < ready.length; k++) {	      System.err.println("["+k+"] ready:"+ready[k]+" selarr:"+selarr[k]);	    }	    throw new IllegalArgumentException("balance: All items in selarr are null! This is a bug - please contact mdw@cs.berkeley.edu");	  }	}	if (DEBUG) System.err.println("NIOSelectSource: balance: "+n+"->"+i);	a = selarr[n]; selarr[n] = null; ready[i] = a;      }    }  }  // Initialize the balancer  private void initBalancer() {    balancer_seq = new int[BALANCER_SEQUENCE_SIZE];    Random r = new Random(); // XXX Need better seed?    for (int i = 0; i < BALANCER_SEQUENCE_SIZE; i++) {      balancer_seq[i] = Math.abs(r.nextInt());    }    balancer_seq_off = 0;  }  void setName(String thename) {    this.name = thename;  }  public String toString() {    return "NIOSS("+name+")";  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -