📄 remotemonitorpeerinfohandler.java
字号:
public void processMonitorReport(MonitorEvent monitorEvent) { MonitorReport monitorReport = monitorEvent.getMonitorReport(); try { RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorReportResponse(queryId, monitorReport); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } public void monitorReportingCancelled(MonitorEvent monitorEvent) { throw new RuntimeException("METHOD NOT IMPLEMENTED"); } public void monitorRequestFailed(MonitorEvent monitorEvent) { throw new RuntimeException("METHOD NOT IMPLEMENTED"); } }; int leaseId = getNextLeaseId(); final LeaseInfo leaseInfo = new LeaseInfo(leaseId, requestSourceID, queryId, monitorListener, lease, peerInfoMessenger); long leaseTime = getLeaseTime(lease); setupLeaseTimeout(leaseInfo.leaseId, leaseTime); try { /* * Currently we can neither ask peers in the netgroup for transport * metrics, nor discover peers in the world group. Therefore we're * asking peers in the netgroup to send TransportMetrics, but that * peer is actually attaching the MonitorFilter to it's WorldGroup * peer. */ for (Iterator i = monitorFilter.getServiceMonitorFilters(); i.hasNext();) { ServiceMonitorFilter serviceMonitorFilter = (ServiceMonitorFilter) i.next(); if (serviceMonitorFilter.getModuleClassID().equals(MonitorResources.transportServiceMonitorClassID)) { try { MonitorFilter worldGroupFilter = new MonitorFilter("worldGroupFilter"); worldGroupFilter.addServiceMonitorFilter(serviceMonitorFilter); i.remove(); PeerGroup worldGroup = peerGroup.newGroup(PeerGroupID.worldPeerGroupID); PeerInfoService worldService = worldGroup.getPeerInfoService(); worldService.addMonitorListener(worldGroupFilter, remoteMonitorQuery.getReportRate(), includeCumulative, monitorListener); leaseInfo.listenerAddedToWorldGroup = true; leaseInfo.worldGroup = worldGroup; } catch (PeerGroupException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } } if (monitorFilter.getServiceMonitorFilterCount() > 0) { peerInfoServiceImpl.addMonitorListener(monitorFilter, reportRate, includeCumulative, monitorListener); } leaseInfos.put(leaseId, leaseInfo); RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorRegisteredResponse(queryId, leaseId, leaseTime); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } catch (MonitorFilterException e) { RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createInvalidFilterResponse(queryId); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } catch (MonitorException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } private void handleRemoveMonitorQuery(int queryId, PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, PeerInfoMessenger peerInfoMessenger) { try { int leaseId = remoteMonitorQuery.getLeaseId(); LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId)); if (leaseInfo != null) { MonitorListener monitorListener = leaseInfo.monitorListener; peerInfoServiceImpl.removeMonitorListener(monitorListener); if (leaseInfo.listenerAddedToWorldGroup) { PeerInfoService peerInfoService = leaseInfo.worldGroup.getPeerInfoService(); peerInfoService.removeMonitorListener(monitorListener); } RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createMonitorRemovedResponse(queryId); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } } catch (MonitorException e) { // Currently not thrown by MonitorManager.removeMonitorListener() if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } private void handleCumulativeReportQuery(int queryId, PeerID requestSourceID, MonitorFilter monitorFilter, PeerInfoMessenger peerInfoMessenger) throws MonitorException, DocumentSerializationException { MonitorReport monitorReport = peerInfoServiceImpl.getCumulativeMonitorReport(monitorFilter); RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createCumulativeReportResponse(queryId, monitorReport); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } private void handlePeerMonitorInfoQuery(int queryId, PeerID requestSourceID, PeerInfoMessenger peerInfoMessenger) throws DocumentSerializationException { // FIX-ME: /* Asking the NetGroup Peer won't tell me if it supports transport * monitoring or not, but asking the world group guy gives me * everything I need because as currently implemented you can't turn * monitoring on or off at the PeerGroup level, only the device level. */ try { PeerGroup worldGroup = peerGroup.newGroup(PeerGroupID.worldPeerGroupID); PeerInfoService worldService = worldGroup.getPeerInfoService(); PeerMonitorInfo peerMonitorInfo = worldService.getPeerMonitorInfo(); RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createPeerMonitorInfoResponse(queryId, peerMonitorInfo); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } catch (PeerGroupException e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine(e.toString()); } } } private void handleLeaseRenewalQuery(int queryId, PeerID requestSourceID, RemoteMonitorQuery remoteMonitorQuery, PeerInfoMessenger peerInfoMessenger) throws DocumentSerializationException { int leaseId = remoteMonitorQuery.getLeaseId(); LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId)); if (leaseInfo != null) { long reqLease = remoteMonitorQuery.getLease(); long lease = getLeaseTime(reqLease); leaseInfo.validUntil = System.currentTimeMillis() + lease; setupLeaseTimeout(leaseInfo.leaseId, lease); RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createLeaseRenewedResponse(queryId, leaseInfo.leaseId, lease); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } else { RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createDeniedResponse(queryId); peerInfoMessenger.sendPeerInfoResponse(queryId, requestSourceID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } } long getLeaseTime(long requestedLease) { long leaseTime = requestedLease < MAX_LEASE ? requestedLease : MAX_LEASE; leaseTime = leaseTime > MIN_LEASE ? leaseTime : MIN_LEASE; return leaseTime; } private void cancelLease(LeaseInfo leaseInfo) throws MonitorException, DocumentSerializationException { if (leaseInfo.listenerAddedToWorldGroup) { leaseInfo.worldGroup.getPeerInfoService().removeMonitorListener(leaseInfo.monitorListener); } RemoteMonitorResponse remoteMonitorResponse = RemoteMonitorResponse.createLeaseEndedResponse(leaseInfo.queryId, leaseInfo.leaseId); leaseInfo.peerInfoMessenger.sendPeerInfoResponse(leaseInfo.queryId, leaseInfo.peerID, MONITOR_HANDLER_NAME, remoteMonitorResponse); } private void renewLease(int queryId) { try { RequestInfo requestInfo = requestInfos.get(new Integer(queryId)); if (requestInfo != null) { int renewalQueryId = peerInfoServiceImpl.getNextQueryId(); PeerID peerID = requestInfo.peerId; long timeout = requestInfo.timeout; RemoteMonitorQuery remoteMonitorQuery = RemoteMonitorQuery.createLeaseRenewalQuery(requestInfo.leaseId, requestInfo.requestedLease); requestInfo.peerInfoMessenger.sendPeerInfoRequest(queryId, peerID, MONITOR_HANDLER_NAME, remoteMonitorQuery); final RequestInfo renewalRequestInfo = new RequestInfo(peerID, queryId, timeout, requestInfo.peerInfoMessenger); renewalRequestInfo.requestedLease = requestInfo.requestedLease; renewalRequestInfo.origRequestId = queryId; requestInfos.put(renewalQueryId, renewalRequestInfo); } } catch (Exception e) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "error while attempting Monitor lease renewal", e); } } } private void setupLeaseTimeout(final int leaseId, long lease) { timer.schedule(new TimerTask() { @Override public void run() { LeaseInfo leaseInfo = leaseInfos.get(new Integer(leaseId)); if (leaseInfo != null) { long currentTime = System.currentTimeMillis(); if (leaseInfo.validUntil <= currentTime) { try { cancelLease(leaseInfo); } catch (Exception e) { // ignored } finally { leaseInfos.remove(leaseInfo.queryId); } } } } }, lease); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -