📄 scanmanager.java
字号:
} // See ScanManagerMXBean public ScanDirConfigMXBean getConfigurationMBean() { return config; } // Creates and registers a new directory scanner. // Called by applyConfiguration. // throws IllegalStateException if state is not STOPPED or COMPLETED // (you cannot change the config while scanning is scheduled or running). // private DirectoryScannerMXBean addDirectoryScanner( DirectoryScannerConfig bean) throws JMException { try { final DirectoryScannerMXBean scanner; final ObjectName scanName; synchronized (this) { if (state != STOPPED && state != COMPLETED) throw new IllegalStateException(state.toString()); scanner = createDirectoryScanner(bean); scanName = makeDirectoryScannerName(bean.getName()); } LOG.fine("server: "+mbeanServer); LOG.fine("scanner: "+scanner); LOG.fine("scanName: "+scanName); final ObjectInstance moi = mbeanServer.registerMBean(scanner,scanName); final ObjectName moiName = moi.getObjectName(); final DirectoryScannerMXBean proxy = JMX.newMXBeanProxy(mbeanServer,moiName, DirectoryScannerMXBean.class,true); scanmap.put(moiName,proxy); return proxy; } catch (RuntimeException x) { final String msg = "Operation failed: "+x; if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST,msg,x); else LOG.fine(msg); throw x; } catch (JMException x) { final String msg = "Operation failed: "+x; if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST,msg,x); else LOG.fine(msg); throw x; } } // See ScanManagerMXBean public ScanDirConfigMXBean createOtherConfigurationMBean(String name, String filename) throws JMException { final ScanDirConfig profile = new ScanDirConfig(filename); final ObjectName profName = makeScanDirConfigName(name); final ObjectInstance moi = mbeanServer.registerMBean(profile,profName); final ScanDirConfigMXBean proxy = JMX.newMXBeanProxy(mbeanServer,profName, ScanDirConfigMXBean.class,true); configmap.put(moi.getObjectName(),proxy); return proxy; } // See ScanManagerMXBean public Map<String,DirectoryScannerMXBean> getDirectoryScanners() { final Map<String,DirectoryScannerMXBean> proxyMap = newHashMap(); for (Entry<ObjectName,DirectoryScannerMXBean> item : scanmap.entrySet()){ proxyMap.put(item.getKey().getKeyProperty("name"),item.getValue()); } return proxyMap; } // --------------------------------------------------------------- // State Management // --------------------------------------------------------------- /** * For each operation, this map stores a list of states from * which the corresponding operation can be legally called. * For instance, it is legal to call "stop" regardless of the * application state. However, "schedule" can be called only if * the application state is STOPPED, etc... **/ private final static Map<String,EnumSet<ScanState>> allowedStates; static { allowedStates = newHashMap(); // You can always call stop allowedStates.put("stop",EnumSet.allOf(ScanState.class)); // You can only call closed when stopped allowedStates.put("close",EnumSet.of(STOPPED,COMPLETED,CLOSED)); // You can call schedule only when the current task is // completed or stopped. allowedStates.put("schedule",EnumSet.of(STOPPED,COMPLETED)); // switch reserved for background task: goes from SCHEDULED to // RUNNING when it enters the run() method. allowedStates.put("scan-running",EnumSet.of(SCHEDULED)); // switch reserved for background task: goes from RUNNING to // SCHEDULED when it has completed but needs to reschedule // itself for specified interval. allowedStates.put("scan-scheduled",EnumSet.of(RUNNING)); // switch reserved for background task: // goes from RUNNING to COMPLETED upon successful completion allowedStates.put("scan-done",EnumSet.of(RUNNING)); } // Get this object's state. No need to synchronize because // state is volatile. // See ScanManagerMXBean public ScanState getState() { return state; } /** * Enqueue a state changed notification for the given states. **/ private void queueStateChangedNotification( long sequence, long time, ScanState old, ScanState current) { final AttributeChangeNotification n = new AttributeChangeNotification(SCAN_MANAGER_NAME,sequence,time, "ScanManager State changed to "+current,"State", ScanState.class.getName(),old.toString(),current.toString()); // Queue the notification. We have created an unlimited queue, so // this method should always succeed. try { if (!pendingNotifs.offer(n,2,TimeUnit.SECONDS)) { LOG.fine("Can't queue Notification: "+n); } } catch (InterruptedException x) { LOG.fine("Can't queue Notification: "+x); } } /** * Send all notifications present in the queue. **/ private void sendQueuedNotifications() { Notification n; while ((n = pendingNotifs.poll()) != null) { broadcaster.sendNotification(n); } } /** * Checks that the current state is allowed for the given operation, * and if so, switch its value to the new desired state. * This operation also enqueue the appropriate state changed * notification. **/ private ScanState switchState(ScanState desired,String forOperation) { return switchState(desired,allowedStates.get(forOperation)); } /** * Checks that the current state is one of the allowed states, * and if so, switch its value to the new desired state. * This operation also enqueue the appropriate state changed * notification. **/ private ScanState switchState(ScanState desired,EnumSet<ScanState> allowed) { final ScanState old; final long timestamp; final long sequence; synchronized(this) { old = state; if (!allowed.contains(state)) throw new IllegalStateException(state.toString()); state = desired; timestamp = System.currentTimeMillis(); sequence = getNextSeqNumber(); } LOG.fine("switched state: "+old+" -> "+desired); if (old != desired) queueStateChangedNotification(sequence,timestamp,old,desired); return old; } // --------------------------------------------------------------- // schedule() creates a new SessionTask that will be executed later // (possibly right away if delay=0) by a Timer thread. // --------------------------------------------------------------- // The timer used by this object. Lazzy evaluation. Cleaned in // postDeregister() // private Timer timer = null; // See ScanManagerMXBean public void schedule(long delay, long interval) { if (!sequencer.tryAcquire()) { throw new IllegalStateException("Can't acquire lock"); } try { LOG.fine("scheduling new task: state="+state); final ScanState old = switchState(SCHEDULED,"schedule"); final boolean scheduled = scheduleSession(new SessionTask(interval),delay); if (scheduled) LOG.fine("new task scheduled: state="+state); } finally { sequencer.release(); } sendQueuedNotifications(); } // Schedule a SessionTask. The session task may reschedule // a new identical task when it eventually ends. // We use this logic so that the 'interval' time is measured // starting at the end of the task that finishes, rather than // at its beginning. Therefore if a repeated task takes x ms, // it will be repeated every x+interval ms. // private synchronized boolean scheduleSession(SessionTask task, long delay) { if (state == STOPPED) return false; if (timer == null) timer = new Timer("ScanManager"); tasklist.add(task); timer.schedule(task,delay); return true; } // --------------------------------------------------------------- // start() is equivalent to schedule(0,0) // --------------------------------------------------------------- // See ScanManagerMXBean public void start() throws IOException, InstanceNotFoundException { schedule(0,0); } // --------------------------------------------------------------- // Methods used to implement stop() - stop() is asynchronous, // and needs to notify any running background task that it needs // to stop. It also needs to prevent scheduled task from being // run. // --------------------------------------------------------------- // See ScanManagerMXBean public void stop() { if (!sequencer.tryAcquire()) throw new IllegalStateException("Can't acquire lock"); int errcount = 0; final StringBuilder b = new StringBuilder(); try { switchState(STOPPED,"stop"); errcount += cancelSessionTasks(b); errcount += stopDirectoryScanners(b); } finally { sequencer.release(); } sendQueuedNotifications(); if (errcount > 0) { b.insert(0,"stop partially failed with "+errcount+" error(s):"); throw new RuntimeException(b.toString()); } } // See ScanManagerMXBean public void close() { switchState(CLOSED,"close"); sendQueuedNotifications(); } // Appends exception to a StringBuilder message. // private void append(StringBuilder b,String prefix,Throwable t) { final String first = (prefix==null)?"\n":"\n"+prefix; b.append(first).append(String.valueOf(t)); Throwable cause = t; while ((cause = cause.getCause())!=null) { b.append(first).append("Caused by:").append(first); b.append('\t').append(String.valueOf(cause)); } } // Cancels all scheduled session tasks // private int cancelSessionTasks(StringBuilder b) { int errcount = 0; // Stops scheduled tasks if any... // for (SessionTask task : tasklist) { try { task.cancel(); tasklist.remove(task); } catch (Exception ex) { errcount++; append(b,"\t",ex); } } return errcount; } // Stops all DirectoryScanners configured for this object. // private int stopDirectoryScanners(StringBuilder b) { int errcount = 0; // Stops directory scanners if any... // for (DirectoryScannerMXBean s : scanmap.values()) { try { s.stop(); } catch (Exception ex) { errcount++; append(b,"\t",ex); } } return errcount; } // --------------------------------------------------------------- // We start scanning in background in a Timer thread. // The methods below implement that logic. // --------------------------------------------------------------- private void scanAllDirectories() throws IOException, InstanceNotFoundException { int errcount = 0; final StringBuilder b = new StringBuilder(); for (ObjectName key : scanmap.keySet()) { final DirectoryScannerMXBean s = scanmap.get(key); try { if (state == STOPPED) return; s.scan(); } catch (Exception ex) { LOG.log(Level.FINE,key + " failed to scan: "+ex,ex); errcount++; append(b,"\t",ex); } } if (errcount > 0) { b.insert(0,"scan partially performed with "+errcount+" error(s):"); throw new RuntimeException(b.toString()); } } // List of scheduled session task. Needed by stop() to cancel // scheduled sessions. There's usually at most 1 session in // this list (unless there's a bug somewhere ;-)) // private final ConcurrentLinkedQueue<SessionTask> tasklist = new ConcurrentLinkedQueue<SessionTask>(); // Used to give a unique id to session task - useful for // debugging. // private volatile static long taskcount = 0; /** * A session task will be scheduled to run in background in a * timer thread. There can be at most one session task running * at a given time (this is ensured by using a timer - which is * a single threaded object). * * If the session needs to be repeated, it will reschedule an * identical session when it finishes to run. This ensure that * two session runs are separated by the given interval time. * **/ private class SessionTask extends TimerTask { /** * Delay after which the next iteration of this task will * start. This delay is measured starting at the end of * the previous iteration. **/ final long delayBeforeNext; /** * A unique id for this task. **/ final long taskid; /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -