📄 iqpephandler.java
字号:
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2008 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution, or a commercial license * agreement with Jive. */package org.jivesoftware.openfire.pep;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.dom4j.QName;import org.jivesoftware.database.DbConnectionManager;import org.jivesoftware.openfire.IQHandlerInfo;import org.jivesoftware.openfire.XMPPServer;import org.jivesoftware.openfire.auth.UnauthorizedException;import org.jivesoftware.openfire.disco.ServerFeaturesProvider;import org.jivesoftware.openfire.disco.ServerIdentitiesProvider;import org.jivesoftware.openfire.disco.UserIdentitiesProvider;import org.jivesoftware.openfire.disco.UserItemsProvider;import org.jivesoftware.openfire.event.UserEventDispatcher;import org.jivesoftware.openfire.event.UserEventListener;import org.jivesoftware.openfire.handler.IQHandler;import org.jivesoftware.openfire.pubsub.*;import org.jivesoftware.openfire.pubsub.models.AccessModel;import org.jivesoftware.openfire.roster.Roster;import org.jivesoftware.openfire.roster.RosterEventDispatcher;import org.jivesoftware.openfire.roster.RosterEventListener;import org.jivesoftware.openfire.roster.RosterItem;import org.jivesoftware.openfire.session.ClientSession;import org.jivesoftware.openfire.user.*;import org.jivesoftware.util.JiveGlobals;import org.jivesoftware.util.Log;import org.xmpp.forms.DataForm;import org.xmpp.forms.FormField;import org.xmpp.packet.IQ;import org.xmpp.packet.JID;import org.xmpp.packet.PacketError;import org.xmpp.packet.Presence;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.*;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingQueue;/** * <p> * An {@link IQHandler} used to implement XEP-0163: "Personal Eventing via Pubsub" * Version 1.0 * </p> * * <p> * For each user on the server there is an associated {@link PEPService} interacting * with a single {@link PubSubEngine} for managing the user's PEP nodes. * </p> * * <p> * An IQHandler can only handle one namespace in its IQHandlerInfo. However, PEP * related packets are seen having a variety of different namespaces. Thus, * classes like {@link IQPEPOwnerHandler} are used to forward packets having these other * namespaces to {@link IQPEPHandler#handleIQ(IQ)}. * <p> * * <p> * This handler is used for the following namespaces: * <ul> * <li><i>http://jabber.org/protocol/pubsub</i></li> * <li><i>http://jabber.org/protocol/pubsub#owner</i></li> * </ul> * </p> * * @author Armando Jagucki * */public class IQPEPHandler extends IQHandler implements ServerIdentitiesProvider, ServerFeaturesProvider, UserIdentitiesProvider, UserItemsProvider, PresenceEventListener, RemotePresenceEventListener, RosterEventListener, UserEventListener { final static String GET_PEP_SERVICE = "SELECT DISTINCT serviceID FROM pubsubNode WHERE serviceID=?"; /** * Map of PEP services. Table, Key: bare JID (String); Value: PEPService */ private Map<String, PEPService> pepServices; private IQHandlerInfo info; private PubSubEngine pubSubEngine = null; /** * Queue that will store the JID of the local users that came online. This queue * will be consumed by another thread to improve performance of the server. */ private static BlockingQueue<JID> availableSessions = new LinkedBlockingQueue<JID>(); /** * A map of all known full JIDs that have sent presences from a remote server. * table: key Bare JID (String); value Set of JIDs * * This map is convenient for sending notifications to the full JID of remote users * that have sent available presences to the PEP service. */ private Map<String, Set<JID>> knownRemotePresences = new ConcurrentHashMap<String, Set<JID>>(); public IQPEPHandler() { super("Personal Eventing Handler"); pepServices = new ConcurrentHashMap<String, PEPService>(); info = new IQHandlerInfo("pubsub", "http://jabber.org/protocol/pubsub"); // Create a thread that will process the queued JIDs of the sessions that came online. We // are processing the events one at a time so we no longer have the paralellism to the database // that was slowing down the server Thread thread = new Thread("PEP avaiable sessions handler ") { public void run() { final XMPPServer server = XMPPServer.getInstance(); while (!server.isShuttingDown()) { try { JID availableSessionJID = availableSessions.take(); // Send the last published items for the contacts on availableSessionJID's roster. try { Roster roster = server.getRosterManager().getRoster(availableSessionJID.getNode()); for (RosterItem item : roster.getRosterItems()) { if (server.isLocal(item.getJid()) && (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_TO)) { PEPService pepService = getPEPService(item.getJid().toBareJID()); if (pepService != null) { pepService.sendLastPublishedItems(availableSessionJID); } } } } catch (UserNotFoundException e) { // Do nothing } } catch (Exception e) { Log.error(e); } } } }; thread.setDaemon(true); thread.start(); } @Override public void initialize(XMPPServer server) { super.initialize(server); // Listen to presence events to manage PEP auto-subscriptions. PresenceEventDispatcher.addListener(this); // Listen to remote presence events to manage the knownRemotePresences map. RemotePresenceEventDispatcher.addListener(this); // Listen to roster events for PEP subscription cancelling on contact deletion. RosterEventDispatcher.addListener(this); // Listen to user events in order to destroy a PEP service when a user is deleted. UserEventDispatcher.addListener(this); pubSubEngine = new PubSubEngine(server.getPacketRouter()); } /** * Returns true if the PEP service is enabled in the server. * * @return true if the PEP service is enabled in the server. */ public boolean isEnabled() { return JiveGlobals.getBooleanProperty("xmpp.pep.enabled", true); } /** * Loads a PEP service from the database, if it exists. * * @param jid the JID of the owner of the PEP service. * @return the loaded PEP service, or null if not found. */ private PEPService loadPEPServiceFromDB(String jid) { PEPService pepService = null; Connection con = null; PreparedStatement pstmt = null; try { con = DbConnectionManager.getConnection(); // Get all PEP services pstmt = con.prepareStatement(GET_PEP_SERVICE); pstmt.setString(1, jid); ResultSet rs = pstmt.executeQuery(); // Restore old PEPServices while (rs.next()) { String serviceID = rs.getString(1); // Create a new PEPService pepService = new PEPService(XMPPServer.getInstance(), serviceID); pepServices.put(serviceID, pepService); pubSubEngine.start(pepService); if (Log.isDebugEnabled()) { Log.debug("PEP: Restored service for " + serviceID + " from the database."); } } rs.close(); pstmt.close(); } catch (SQLException sqle) { Log.error(sqle); } finally { try { if (pstmt != null) pstmt.close(); } catch (Exception e) { Log.error(e); } try { if (con != null) con.close(); } catch (Exception e) { Log.error(e); } } return pepService; } public void stop() { super.stop(); for (PEPService service : pepServices.values()) { pubSubEngine.shutdown(service); } } public void destroy() { super.destroy(); // Remove listeners PresenceEventDispatcher.removeListener(this); RemotePresenceEventDispatcher.removeListener(this); RosterEventDispatcher.removeListener(this); UserEventDispatcher.removeListener(this); } @Override public IQHandlerInfo getInfo() { return info; } /** * Returns the knownRemotePresences map. * * @return the knownRemotePresences map */ public Map<String, Set<JID>> getKnownRemotePresenes() { return knownRemotePresences; } @Override public IQ handleIQ(IQ packet) throws UnauthorizedException { // Do nothing if server is not enabled if (!isEnabled()) { IQ reply = IQ.createResultIQ(packet); reply.setChildElement(packet.getChildElement().createCopy()); reply.setError(PacketError.Condition.service_unavailable); return reply; } JID senderJID = packet.getFrom(); if (packet.getTo() == null) { if (packet.getType() == IQ.Type.set) { String jidFrom = senderJID.toBareJID(); PEPService pepService = getPEPService(jidFrom); // If no service exists yet for jidFrom, create one. if (pepService == null) { // Return an error if the packet is from an anonymous, unregistered user // or remote user if (!XMPPServer.getInstance().isLocal(senderJID) || !UserManager.getInstance().isRegisteredUser(senderJID.getNode())) { IQ reply = IQ.createResultIQ(packet); reply.setChildElement(packet.getChildElement().createCopy()); reply.setError(PacketError.Condition.not_allowed); return reply; } pepService = new PEPService(XMPPServer.getInstance(), jidFrom); pepServices.put(jidFrom, pepService); // Probe presences pubSubEngine.start(pepService); if (Log.isDebugEnabled()) { Log.debug("PEP: " + jidFrom + " had a PEPService created"); } // Those who already have presence subscriptions to jidFrom // will now automatically be subscribed to this new PEPService. try { Roster roster = XMPPServer.getInstance().getRosterManager().getRoster(senderJID.getNode()); for (RosterItem item : roster.getRosterItems()) { if (item.getSubStatus() == RosterItem.SUB_BOTH || item.getSubStatus() == RosterItem.SUB_FROM) { createSubscriptionToPEPService(pepService, item.getJid(), senderJID); } } } catch (UserNotFoundException e) { // Do nothing } } // If publishing a node, and the node doesn't exist, create it. Element childElement = packet.getChildElement(); Element publishElement = childElement.element("publish"); if (publishElement != null) { String nodeID = publishElement.attributeValue("node"); // Do not allow User Avatar nodes to be created. // TODO: Implement XEP-0084 if (nodeID.startsWith("http://www.xmpp.org/extensions/xep-0084.html")) { IQ reply = IQ.createResultIQ(packet); reply.setChildElement(packet.getChildElement().createCopy()); reply.setError(PacketError.Condition.feature_not_implemented); return reply; } if (pepService.getNode(nodeID) == null) { // Create the node JID creator = new JID(jidFrom); LeafNode newNode = new LeafNode(pepService, pepService.getRootCollectionNode(), nodeID, creator); newNode.addOwner(creator); newNode.saveToDB(); } } // Process with PubSub as usual. pubSubEngine.process(pepService, packet); } } else if (packet.getType() == IQ.Type.get || packet.getType() == IQ.Type.set) { String jidTo = packet.getTo().toBareJID();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -