📄 remotemonitorpeerinfohandler.java
字号:
public void addRemoteMonitorListener(PeerID peerID, MonitorFilter monitorFilter, long reportRate, boolean includeCumulative, MonitorListener monitorListener, long lease, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException { int queryId = peerInfoServiceImpl.getNextQueryId(); RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createRegisterMonitorQuery(includeCumulative, monitorFilter, reportRate, lease); peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery); final RequestInfo requestInfo = new RequestInfo(peerID, queryId, monitorListener, timeout, peerInfoMessenger); requestInfo.requestedLease = lease; requestInfos.put(queryId, requestInfo); timer.schedule(new TimerTask() { @Override public void run() { if (!requestInfo.responseReceived) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.TIMEOUT, requestInfo.peerId, requestInfo.queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); requestInfos.remove(requestInfo.queryId); } } }, timeout); scheduleTimeout(requestInfo); } public void removeRemoteMonitorListener(PeerID peerID, MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException { int queryId = peerInfoServiceImpl.getNextQueryId(); RequestInfo oldRequestInfo = null; for (Enumeration<RequestInfo> e = requestInfos.elements(); e.hasMoreElements();) { RequestInfo ri = e.nextElement(); if (ri.monitorListener == monitorListener) { oldRequestInfo = ri; break; } } if (oldRequestInfo != null) { RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createRemoveMonitorListenerQuery(oldRequestInfo.leaseId); peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery); final RequestInfo requestInfo = new RequestInfo(peerID, queryId, monitorListener, timeout, peerInfoMessenger); requestInfo.origRequestId = oldRequestInfo.queryId; requestInfos.put(queryId, requestInfo); } final RequestInfo requestInfo = oldRequestInfo; timer.schedule(new TimerTask() { @Override public void run() { requestInfos.remove(new Integer(requestInfo.queryId)); } }, timeout); } public void removeRemoteMonitorListener(MonitorListener monitorListener, long timeout, PeerInfoMessenger peerInfoMessenger) throws MonitorException { for (Enumeration<RequestInfo> e = requestInfos.elements(); e.hasMoreElements();) { RequestInfo requestInfo = e.nextElement(); if (requestInfo.monitorListener == monitorListener) { removeRemoteMonitorListener(requestInfo.peerId, monitorListener, timeout, peerInfoMessenger); } } } public void processRequest(int queryId, PeerID requestSourceID, PeerInfoQueryMessage peerInfoQueryMessage, Element requestElement, PeerInfoMessenger peerInfoMessenger) { try { RemoteMonitorQuery remoteMonitorQuery = (RemoteMonitorQuery) DocumentSerializableUtilities.getDocumentSerializable(requestElement, RemoteMonitorQuery.class); if (remoteMonitorQuery.isRegisterMonitorQuery()) { handleRegisterMonitorQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger); } else if (remoteMonitorQuery.isCumulativeReportQuery()) { handleCumulativeReportQuery(queryId, requestSourceID, remoteMonitorQuery.getMonitorFilter(), peerInfoMessenger); } else if (remoteMonitorQuery.isRemoveMonitorQuery()) { handleRemoveMonitorQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger); } else if (remoteMonitorQuery.isPeerMonitorInfoQuery()) { handlePeerMonitorInfoQuery(queryId, requestSourceID, peerInfoMessenger); } else if (remoteMonitorQuery.isLeaseRenewal()) { handleLeaseRenewalQuery(queryId, requestSourceID, remoteMonitorQuery, peerInfoMessenger); } } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Monitor failed in processQuery", e); } } } public void processResponse(int queryId, PeerInfoResponseMessage peerInfoResponseMessage, Element responseElement, PeerInfoMessenger peerInfoMessenger) { RemoteMonitorResponse remoteMonitorResponse; try { remoteMonitorResponse = (RemoteMonitorResponse) DocumentSerializableUtilities.getDocumentSerializable(responseElement, RemoteMonitorResponse.class); RequestInfo requestInfo = requestInfos.get(new Integer(queryId)); if (requestInfo != null) { requestInfo.responseReceived = true; resetTimeout(requestInfo); if (remoteMonitorResponse.isMonitorRegistered()) { int leaseId = remoteMonitorResponse.getLeaseId(); long leaseLength = remoteMonitorResponse.getLease(); requestInfo.leaseId = leaseId; scheduleLeaseRenewal(requestInfo, leaseLength); } else if (remoteMonitorResponse.isMonitorRemoved()) { requestInfos.remove(new Integer(requestInfo.origRequestId)); requestInfos.remove(new Integer(queryId)); } else if (remoteMonitorResponse.isCumulativeReport() || remoteMonitorResponse.isMonitorReport()) { MonitorReport monitorReport = remoteMonitorResponse.getMonitorReport(); MonitorEvent monitorEvent = MonitorEvent.createRemoteMonitorReportEvent(requestInfo.peerId, requestInfo.queryId, monitorReport); requestInfo.monitorListener.processMonitorReport(monitorEvent); } else if (remoteMonitorResponse.isInvalidFilter()) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.INVALID_MONITOR_FILTER, requestInfo.peerId, requestInfo.queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); requestInfos.remove(new Integer(queryId)); } else if (remoteMonitorResponse.isInvalidReportRate()) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.INVALID_REPORT_RATE, requestInfo.peerId, requestInfo.queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); requestInfos.remove(new Integer(queryId)); } else if (remoteMonitorResponse.isMeteringNotSupported()) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.REFUSED, requestInfo.peerId, requestInfo.queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); requestInfos.remove(new Integer(queryId)); } else if (remoteMonitorResponse.isRequestDenied()) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.REFUSED, requestInfo.peerId, requestInfo.queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); } else if (remoteMonitorResponse.isPeerMonitorInfo()) { PeerMonitorInfoEvent peerMonitorInfoEvent = new PeerMonitorInfoEvent(requestInfo.peerId, remoteMonitorResponse.getPeerMonitorInfo()); requestInfo.peerMonitorInfoListener.peerMonitorInfoReceived(peerMonitorInfoEvent); requestInfos.remove(new Integer(queryId)); } else if (remoteMonitorResponse.isLeaseRenewed()) { long lease = remoteMonitorResponse.getLease(); int origRequestId = requestInfo.origRequestId; RequestInfo origRequest = requestInfos.get(new Integer(origRequestId)); scheduleLeaseRenewal(origRequest, lease); requestInfos.remove(new Integer(queryId)); } } } catch (DocumentSerializationException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Document Serialization Failed", e); } } } private void resetTimeout(RequestInfo requestInfo) { timeouts.put(requestInfo.queryId, requestInfo.timeout + System.currentTimeMillis()); } private long getTimeout(int queryId) { return timeouts.get(queryId); } private void scheduleTimeout(final RequestInfo requestInfo) { final int queryId = requestInfo.queryId; timer.schedule( new TimerTask() { @Override public void run() { if (requestInfos.containsKey(new Integer(queryId))) { try { if (System.currentTimeMillis() > getTimeout(queryId)) { MonitorEvent monitorEvent = MonitorEvent.createFailureEvent(MonitorEvent.TIMEOUT, requestInfo.peerId, queryId); requestInfo.monitorListener.monitorRequestFailed(monitorEvent); } } catch (Exception e) { //ignored } } else { cancel(); } } }, requestInfo.timeout, requestInfo.timeout); } private void scheduleLeaseRenewal(RequestInfo requestInfo, long leaseLength) { long roundTrip = requestInfo.requestTime - System.currentTimeMillis(); long renewTime = leaseLength - roundTrip - 30 * 1000L; // 30s comfort // zone. final int queryId = requestInfo.queryId; if (renewTime > MIN_LEASE) { timer.schedule(new TimerTask() { @Override public void run() { try { renewLease(queryId); } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "Lease Renewal Failed", e); } } } }, renewTime); } } private void handleRegisterMonitorQuery(final int queryId, final PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, final PeerInfoMessenger peerInfoMessenger) { MonitorFilter monitorFilter = remoteMonitorQuery.getMonitorFilter(); long lease = remoteMonitorQuery.getLease(); long reportRate = remoteMonitorQuery.getReportRate(); boolean includeCumulative = remoteMonitorQuery.isIncludeCumulative(); MonitorListener monitorListener = new MonitorListener() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -