📄 nioselectsource.java
字号:
for (int i = 0; i < ret.length; i++) { ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]); } return ret; } /** * Dequeues at most <tt>num</tt> elements which are ready from the * SelectSource. Returns null if no entries available. */ public QueueElementIF[] dequeue(int num) { if (selector.keys().size() == 0) return null; if ((ready_size == 0) || (ready_offset == ready_size)) { doPoll(0); } if (ready_size == 0) return null; int numtoret = Math.min(ready_size - ready_offset, num); NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret]; for (int i = 0; i < numtoret; i++) { ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]); } return ret; } /** * Dequeue the next element from the SelectSource. Blocks up to * timeout_millis milliseconds; returns null if no entries available * after that time. A timeout of -1 blocks forever. */ public QueueElementIF blocking_dequeue(int timeout_millis) { if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called"); synchronized (blocker) { if (selector.keys().size() == 0) { if (DEBUG) System.err.println("No keys in selector"); if (timeout_millis == 0) return null; // Wait for something to be registered if (timeout_millis == -1) { try { blocker.wait(); } catch (InterruptedException ie) { } } else { try { blocker.wait(timeout_millis); } catch (InterruptedException ie) { } } } } if ((ready_size == 0) || (ready_offset == ready_size)) { doPoll(timeout_millis); } if (ready_size == 0) { if (DEBUG) System.err.println("still no ready"); return null; } return new NIOSelectorQueueElement(ready[ready_offset++]); } /** * Dequeue a set of elements from the SelectSource. Blocks up to * timeout_millis milliseconds; returns null if no entries available * after that time. A timeout of -1 blocks forever. */ public QueueElementIF[] blocking_dequeue_all(int timeout_millis) { if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue_all called"); /* have to do this to retain same semantics as before nio expects 0 for indefinite. there is no way to say don't block at all, so hopefully 1ms isn't noticable to people */ synchronized (blocker) { if (selector.keys().size() == 0) { if (DEBUG) System.err.println("!!!!no keys"); if (timeout_millis == 0) return null; // Wait for something to be registered if (timeout_millis == -1) { try { blocker.wait(); } catch (InterruptedException ie) { } } else { try { blocker.wait(timeout_millis); } catch (InterruptedException ie) { } } } } if ((ready_size == 0) || (ready_offset == ready_size)) { doPoll(timeout_millis); } if (DEBUG) System.err.println("!!!!ready_size=" + ready_size); if (ready_size == 0) return null; if (DEBUG) System.err.println("!!!!ready_size-ready_offset=" + (ready_size - ready_offset)); NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[ready_size-ready_offset]; for (int i = 0; i < ret.length; i++) { if (DEBUG) System.err.println("ret["+i+"] = " + ready[ready_offset]); ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]); } return ret; } /** * Dequeue a set of elements from the SelectSource. Blocks up to * timeout_millis milliseconds; returns null if no entries available * after that time. A timeout of -1 blocks forever. */ public QueueElementIF[] blocking_dequeue(int timeout_millis, int num) { if (DEBUG) System.err.println("NIOSelectSource ("+name+"): blocking_dequeue called"); synchronized (blocker) { if (selector.keys().size() == 0) { if (timeout_millis == 0) return null; // Wait for something to be registered if (timeout_millis == -1) { try { blocker.wait(); } catch (InterruptedException ie) { } } else { try { blocker.wait(timeout_millis); } catch (InterruptedException ie) { } } } } if ((ready_size == 0) || (ready_offset == ready_size)) { doPoll(timeout_millis); } if (ready_size == 0) return null; int numtoret = Math.min(ready_size - ready_offset, num); NIOSelectorQueueElement ret[] = new NIOSelectorQueueElement[numtoret]; for (int i = 0; i < numtoret; i++) { ret[i] = new NIOSelectorQueueElement(ready[ready_offset++]); } return ret; } // Actually performs the poll and sets ready[], ready_off, ready_size // // XXX MDW: There is a race condition here. If multiple threads // call doPoll (e.g., through dequeue()), then the ready set can // get corrupted. The fix is to make this method synchronized, but // this would cause a blocking dequeue() to stall all other (possibly // nonblocking) dequeues until the timeout of the longest blocking // dequeue. I don't see an easy way around this problem since it's // the selector.selectedKeys() set that changes with each call to // selector.select() or selector.selectNow(). The answer is: This class // is not thread-safe! private void doPoll(int timeout) { if (DEBUG) System.err.println("NIOSelectSource ("+name+"): Doing poll, timeout "+timeout); int c = 0; try { // to correct for changed semantics in nio from nbio. // use selectNow to not block, and select(0) for indefinite block if (timeout == 0) { c = selector.selectNow(); } else { if (timeout == -1) timeout = 0; c = selector.select(timeout); } } catch (IOException e) { // Essentially ignore the exception (since NBIO SelectSet.select() // doesn't throw any exceptions) if (DEBUG) System.err.println("NIOSelectSource: Error doing select: " + e); } if (DEBUG) System.err.println("NIOSelectSource ("+name+"): poll returned "+c); Set skeys = selector.selectedKeys(); if (skeys.size() > 0) { SelectionKey ret[] = new SelectionKey[skeys.size()]; Iterator key_iter = skeys.iterator(); int j = 0; while (key_iter.hasNext()) { ret[j] = (SelectionKey)key_iter.next(); key_iter.remove(); //selector.selectedKeys().remove(ret[j]); j++; } if (ret.length != 0) { // XXX We can't get ret == null if doPoll() is synchronized with // deregister() - but I'm not sure I want to do that ready_offset = 0; ready_size = ret.length; balance(ret); return; } } // Didn't get anything ready = null; ready_offset = ready_size = 0; } // Balances selarr[] by shuffling the entries - sets ready[] private void balance(SelectionKey selarr[]) { if (DEBUG) System.err.println("NIOSelectSource ("+name+"): balance called, selarr size="+selarr.length); for (int i = 0; i < selarr.length; i++) if (DEBUG) System.err.println("!!!!selar["+i+"] = " + selarr[i]); if ((!do_balance) || (selarr.length < 2)) { ready = selarr; } else { SelectionKey a; ready = new SelectionKey[selarr.length]; for (int i = 0; i < ready.length; i++) { if (balancer_seq_off == BALANCER_SEQUENCE_SIZE) { balancer_seq_off = 0; } int n = balancer_seq[balancer_seq_off++] % selarr.length; int c = 0; while (selarr[n] == null) { n++; c++; if (n == selarr.length) n = 0; if (c == selarr.length) { System.err.println("WARNING: NIOSelectSource.balance(): All items in selarr are null (n="+n+", c="+c+", len="+selarr.length); for (int k = 0; k < ready.length; k++) { System.err.println("["+k+"] ready:"+ready[k]+" selarr:"+selarr[k]); } throw new IllegalArgumentException("balance: All items in selarr are null! This is a bug - please contact mdw@cs.berkeley.edu"); } } if (DEBUG) System.err.println("NIOSelectSource: balance: "+n+"->"+i); a = selarr[n]; selarr[n] = null; ready[i] = a; } } } // Initialize the balancer private void initBalancer() { balancer_seq = new int[BALANCER_SEQUENCE_SIZE]; Random r = new Random(); // XXX Need better seed? for (int i = 0; i < BALANCER_SEQUENCE_SIZE; i++) { balancer_seq[i] = Math.abs(r.nextInt()); } balancer_seq_off = 0; } void setName(String thename) { this.name = thename; } public String toString() { return "NIOSS("+name+")"; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -