📄 selectormanager.java
字号:
// now, hack to make sure that all cancelled keys are actually // cancelled (dumb) selector.selectNow(); } } } } catch (Throwable t) { if (logger.level <= Logger.SEVERE) { logger.logException( "ERROR (SelectorManager.run): ", t); } System.exit(-1); } if (logger.level <= Logger.INFO) { logger.log("Selector " + instance + " shutting down."); } } /** * DESCRIBE THE METHOD */ public void destroy() { running = false; } /** * DESCRIBE THE METHOD */ protected void notifyLoopListeners() { long now = timeSource.currentTimeMillis(); long diff = now - lastTime; // notify observers synchronized (loopObservers) { Iterator i = loopObservers.iterator(); while (i.hasNext()) { LoopObserver lo = (LoopObserver) i.next(); if (lo.delayInterest() >= diff) { lo.loopTime((int) diff); } } } lastTime = now; } /** * Adds a feature to the LoopObserver attribute of the SelectorManager object * * @param lo The feature to be added to the LoopObserver attribute */ public void addLoopObserver(LoopObserver lo) { synchronized (loopObservers) { loopObservers.add(lo); } } /** * DESCRIBE THE METHOD * * @param lo DESCRIBE THE PARAMETER */ public void removeLoopObserver(LoopObserver lo) { synchronized (loopObservers) { loopObservers.remove(lo); } } /** * DESCRIBE THE METHOD * * @exception IOException DESCRIBE THE EXCEPTION */ protected void doSelections() throws IOException { SelectionKey[] keys = selectedKeys(); // to debug weird selection bug if (keys.length > 1000 && logger.level <= Logger.FINE) { logger.log("lots of selection keys!"); HashMap histo = new HashMap(); for (int i = 0; i < keys.length; i++) { String keyclass = keys[i].getClass().getName(); if (histo.containsKey(keyclass)) { histo.put(keyclass, new Integer(((Integer) histo.get(keyclass)).intValue() + 1)); } else { histo.put(keyclass, new Integer(1)); } } logger.log("begin selection keys by class"); Iterator it = histo.keySet().iterator(); while (it.hasNext()) { String name = (String) it.next(); logger.log("Selection Key: " + name + ": " + histo.get(name)); } logger.log("end selection keys by class"); } for (int i = 0; i < keys.length; i++) { selector.selectedKeys().remove(keys[i]); synchronized (keys[i]) { SelectionKeyHandler skh = (SelectionKeyHandler) keys[i].attachment(); if (skh != null) { // accept if (keys[i].isValid() && keys[i].isAcceptable()) { skh.accept(keys[i]); } // connect if (keys[i].isValid() && keys[i].isConnectable()) { skh.connect(keys[i]); } // read if (keys[i].isValid() && keys[i].isReadable()) { skh.read(keys[i]); } // write if (keys[i].isValid() && keys[i].isWritable()) { skh.write(keys[i]); } } else { keys[i].channel().close(); keys[i].cancel(); } } } } /** * Method which invokes all pending invocations. This method should *only* be * called by the selector thread. */ protected void doInvocations() { Iterator i; synchronized (this) { i = new ArrayList(invocations).iterator(); invocations.clear(); } while (i.hasNext()) { Runnable run = (Runnable) i.next(); try { run.run(); } catch (Exception e) { if (logger.level <= Logger.SEVERE) { logger.logException( "Invoking runnable caused exception " + e + " - continuing", e); } } } synchronized (this) { i = new ArrayList(modifyKeys).iterator(); modifyKeys.clear(); } while (i.hasNext()) { SelectionKey key = (SelectionKey) i.next(); if (key.isValid() && (key.attachment() != null)) { ((SelectionKeyHandler) key.attachment()).modifyKey(key); } } } /** * Selects on the selector, and returns the result. Also properly synchronizes * around the selector * * @param time DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ int select(int time) throws IOException { if (time > TIMEOUT) { time = TIMEOUT; } try { if ((time <= 0) || (invocations.size() > 0) || (modifyKeys.size() > 0)) { return selector.selectNow(); } wakeupTime = timeSource.currentTimeMillis() + time; return selector.select(time); } catch (CancelledKeyException cce) { if (logger.level <= Logger.WARNING) { logger.logException("CCE: cause:", cce.getCause()); } throw cce; } catch (IOException e) { if (e.getMessage().indexOf("Interrupted system call") >= 0) { if (logger.level <= Logger.WARNING) { logger.log("Got interrupted system call, continuing anyway..."); } return 1; } else { throw e; } } } /** * Selects all of the keys on the selector and returns the result as an array * of keys. * * @return The array of keys * @exception IOException DESCRIBE THE EXCEPTION */ private SelectionKey[] keys() throws IOException { return (SelectionKey[]) selector.keys().toArray(new SelectionKey[0]); } /** * Selects all of the currenlty selected keys on the selector and returns the * result as an array of keys. * * @return The array of keys * @exception IOException DESCRIBE THE EXCEPTION */ protected SelectionKey[] selectedKeys() throws IOException { return (SelectionKey[]) selector.selectedKeys() .toArray(new SelectionKey[0]); } /** * Method which schedules a task to run after a specified number of millis * * @param task The task to run * @param delay The delay before running, in milliseconds */ public void schedule(TimerTask task, long delay) { task.nextExecutionTime = timeSource.currentTimeMillis() + delay; addTask(task); } /** * Method which schedules a task to run repeatedly after a specified delay and * period * * @param task The task to run * @param delay The delay before first running, in milliseconds * @param period The period with which to run in milliseconds */ public void schedule(TimerTask task, long delay, long period) { task.nextExecutionTime = timeSource.currentTimeMillis() + delay; task.period = (int) period; addTask(task); } /** * Method which schedules a task to run repeatedly (at a fixed rate) after a * specified delay and period * * @param task The task to run * @param delay The delay before first running in milliseconds * @param period The period with which to run in milliseconds */ public void scheduleAtFixedRate(TimerTask task, long delay, long period) { task.nextExecutionTime = timeSource.currentTimeMillis() + delay; task.period = (int) period; task.fixedRate = true; addTask(task); } /** * Internal method which adds a task to the task tree, waking up the selector * if necessary to recalculate the sleep time * * @param task The task to add */ private void addTask(TimerTask task) { synchronized (selector) { if (!timerQueue.add(task)) { System.out.println("ERROR: Got false while enqueueing task " + task + "!"); Thread.dumpStack(); } } // need to interrupt thread if waiting too long in selector if (wakeupTime >= task.scheduledExecutionTime()) { selector.wakeup(); } } /** * Internal method which finds all due tasks and executes them. */ protected void executeDueTasks() { //System.out.println("SM.executeDueTasks()"); long now = timeSource.currentTimeMillis(); ArrayList executeNow = new ArrayList(); // step 1, fetch all due timers synchronized (selector) { boolean done = false; while (!done) { if (timerQueue.size() > 0) { TimerTask next = (TimerTask) timerQueue.first(); if (next.nextExecutionTime <= now) { executeNow.add(next); //System.out.println("Removing:"+next); timerQueue.remove(next); } else { done = true; } } else { done = true; } } } // step 2, execute them all // items to be added back into the queue ArrayList addBack = new ArrayList(); Iterator i = executeNow.iterator(); while (i.hasNext()) { TimerTask next = (TimerTask) i.next(); try { //System.out.println("SM.Executing "+next); if (next.execute(timeSource)) { addBack.add(next); } } catch (Exception e) { if (logger.level <= Logger.SEVERE) { logger.logException("", e); } } } // step 3, add them back if necessary synchronized (selector) { i = addBack.iterator(); while (i.hasNext()) { TimerTask tt = (TimerTask) i.next(); //System.out.println("SM.addBack("+tt+")"); timerQueue.add(tt); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -