pubsubmodule.java
来自「基于Jabber协议的即时消息服务器」· Java 代码 · 共 694 行 · 第 1/2 页
JAVA
694 行
/** * $RCSfile: $ * $Revision: $ * $Date: $ * * Copyright (C) 2006 Jive Software. All rights reserved. * * This software is published under the terms of the GNU Public License (GPL), * a copy of which is included in this distribution. */package org.jivesoftware.wildfire.pubsub;import org.dom4j.DocumentHelper;import org.dom4j.Element;import org.jivesoftware.util.JiveGlobals;import org.jivesoftware.util.LocaleUtils;import org.jivesoftware.util.Log;import org.jivesoftware.util.StringUtils;import org.jivesoftware.wildfire.PacketRouter;import org.jivesoftware.wildfire.RoutableChannelHandler;import org.jivesoftware.wildfire.RoutingTable;import org.jivesoftware.wildfire.XMPPServer;import org.jivesoftware.wildfire.auth.UnauthorizedException;import org.jivesoftware.wildfire.container.BasicModule;import org.jivesoftware.wildfire.disco.DiscoInfoProvider;import org.jivesoftware.wildfire.disco.DiscoItemsProvider;import org.jivesoftware.wildfire.disco.DiscoServerItem;import org.jivesoftware.wildfire.disco.ServerItemsProvider;import org.jivesoftware.wildfire.forms.DataForm;import org.jivesoftware.wildfire.forms.spi.XDataFormImpl;import org.jivesoftware.wildfire.forms.spi.XFormFieldImpl;import org.jivesoftware.wildfire.pubsub.models.AccessModel;import org.jivesoftware.wildfire.pubsub.models.PublisherModel;import org.xmpp.packet.*;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CopyOnWriteArrayList;/** * Module that implements JEP-60: Publish-Subscribe. By default node collections and * instant nodes are supported. * * @author Matt Tucker */public class PubSubModule extends BasicModule implements ServerItemsProvider, DiscoInfoProvider, DiscoItemsProvider, RoutableChannelHandler, PubSubService { /** * the chat service's hostname */ private String serviceName = null; /** * Collection node that acts as the root node of the entire node hierarchy. */ private CollectionNode rootCollectionNode = null; /** * Nodes managed by this manager, table: key nodeID (String); value Node */ private Map<String, Node> nodes = new ConcurrentHashMap<String, Node>(); /** * Returns the permission policy for creating nodes. A true value means that not anyone can * create a node, only the JIDs listed in <code>allowedToCreate</code> are allowed to create * nodes. */ private boolean nodeCreationRestricted = false; /** * Bare jids of users that are allowed to create nodes. An empty list means that anyone can * create nodes. */ private Collection<String> allowedToCreate = new CopyOnWriteArrayList<String>(); /** * Bare jids of users that are system administrators of the PubSub service. A sysadmin * has the same permissions as a node owner. */ private Collection<String> sysadmins = new CopyOnWriteArrayList<String>(); /** * The packet router for the server. */ private PacketRouter router = null; private RoutingTable routingTable = null; /** * Default configuration to use for newly created leaf nodes. */ private DefaultNodeConfiguration leafDefaultConfiguration; /** * Default configuration to use for newly created collection nodes. */ private DefaultNodeConfiguration collectionDefaultConfiguration; /** * Private component that actually performs the pubsub work. */ private PubSubEngine engine = null; public PubSubModule() { super("Publish Subscribe Service"); } public void process(Packet packet) { // TODO Remove this method when moving PubSub as a component and removing module code // The MUC service will receive all the packets whose domain matches the domain of the MUC // service. This means that, for instance, a disco request should be responded by the // service itself instead of relying on the server to handle the request. try { // Check if the packet is a disco request or a packet with namespace iq:register if (packet instanceof IQ) { if (!engine.process((IQ) packet)) { process((IQ) packet); } } else if (packet instanceof Presence) { engine.process((Presence) packet); } else { engine.process((Message) packet); } } catch (Exception e) { Log.error(LocaleUtils.getLocalizedString("admin.error"), e); if (packet instanceof IQ) { // Send internal server error IQ reply = IQ.createResultIQ((IQ) packet); reply.setError(PacketError.Condition.internal_server_error); send(reply); } } } private void process(IQ iq) { // Ignore IQs of type ERROR if (IQ.Type.error == iq.getType()) { return; } Element childElement = iq.getChildElement(); String namespace = null; if (childElement != null) { namespace = childElement.getNamespaceURI(); } if ("http://jabber.org/protocol/disco#info".equals(namespace)) { try { // TODO PubSub should have an IQDiscoInfoHandler of its own when PubSub becomes // a component IQ reply = XMPPServer.getInstance().getIQDiscoInfoHandler().handleIQ(iq); router.route(reply); } catch (UnauthorizedException e) { // Do nothing. This error should never happen } } else if ("http://jabber.org/protocol/disco#items".equals(namespace)) { try { // TODO PubSub should have an IQDiscoItemsHandler of its own when PubSub becomes // a component IQ reply = XMPPServer.getInstance().getIQDiscoItemsHandler().handleIQ(iq); router.route(reply); } catch (UnauthorizedException e) { // Do nothing. This error should never happen } } else { // Unknown namespace requested so return error to sender engine.sendErrorPacket(iq, PacketError.Condition.service_unavailable, null); } } public String getServiceID() { return "pubsub"; } public boolean canCreateNode(JID creator) { // Node creation is always allowed for sysadmin if (isNodeCreationRestricted() && !isServiceAdmin(creator)) { // The user is not allowed to create nodes return false; } return true; } public boolean isServiceAdmin(JID user) { return sysadmins.contains(user.toBareJID()) || allowedToCreate.contains(user.toBareJID()); } public boolean isInstantNodeSupported() { return true; } public boolean isCollectionNodesSupported() { return true; } public CollectionNode getRootCollectionNode() { return rootCollectionNode; } public DefaultNodeConfiguration getDefaultNodeConfiguration(boolean leafType) { if (leafType) { return leafDefaultConfiguration; } return collectionDefaultConfiguration; } public Collection<String> getShowPresences(JID subscriber) { return engine.getShowPresences(subscriber); } public void presenceSubscriptionNotRequired(Node node, JID user) { engine.presenceSubscriptionNotRequired(node, user); } public void presenceSubscriptionRequired(Node node, JID user) { engine.presenceSubscriptionRequired(node, user); } public PubSubEngine getPubSubEngine() { return engine; } public String getServiceName() { return serviceName; } public String getServiceDomain() { return serviceName + "." + XMPPServer.getInstance().getServerInfo().getName(); } public JID getAddress() { // TODO Cache this JID for performance? return new JID(null, getServiceDomain(), null); } public Collection<String> getUsersAllowedToCreate() { return allowedToCreate; } public Collection<String> getSysadmins() { return sysadmins; } public void addSysadmin(String userJID) { sysadmins.add(userJID.trim().toLowerCase()); // Update the config. String[] jids = new String[sysadmins.size()]; jids = sysadmins.toArray(jids); JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids)); } public void removeSysadmin(String userJID) { sysadmins.remove(userJID.trim().toLowerCase()); // Update the config. String[] jids = new String[sysadmins.size()]; jids = sysadmins.toArray(jids); JiveGlobals.setProperty("xmpp.pubsub.sysadmin.jid", fromArray(jids)); } public boolean isNodeCreationRestricted() { return nodeCreationRestricted; } public void setNodeCreationRestricted(boolean nodeCreationRestricted) { this.nodeCreationRestricted = nodeCreationRestricted; JiveGlobals.setProperty("xmpp.pubsub.create.anyone", Boolean.toString(nodeCreationRestricted)); } public void addUserAllowedToCreate(String userJID) { // Update the list of allowed JIDs to create nodes. allowedToCreate.add(userJID.trim().toLowerCase()); // Update the config. String[] jids = new String[allowedToCreate.size()]; jids = allowedToCreate.toArray(jids); JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids)); } public void removeUserAllowedToCreate(String userJID) { // Update the list of allowed JIDs to create nodes. allowedToCreate.remove(userJID.trim().toLowerCase()); // Update the config. String[] jids = new String[allowedToCreate.size()]; jids = allowedToCreate.toArray(jids); JiveGlobals.setProperty("xmpp.pubsub.create.jid", fromArray(jids)); } public void initialize(XMPPServer server) { super.initialize(server); serviceName = JiveGlobals.getProperty("xmpp.pubsub.service"); if (serviceName == null) { serviceName = "pubsub"; } // Load the list of JIDs that are sysadmins of the PubSub service String property = JiveGlobals.getProperty("xmpp.pubsub.sysadmin.jid"); String[] jids; if (property != null) { jids = property.split(","); for (String jid : jids) { sysadmins.add(jid.trim().toLowerCase()); } } nodeCreationRestricted = Boolean.parseBoolean(JiveGlobals.getProperty("xmpp.pubsub.create.anyone", "false")); // Load the list of JIDs that are allowed to create nodes property = JiveGlobals.getProperty("xmpp.pubsub.create.jid"); if (property != null) { jids = property.split(","); for (String jid : jids) { allowedToCreate.add(jid.trim().toLowerCase()); } } routingTable = server.getRoutingTable(); router = server.getPacketRouter(); engine = new PubSubEngine(this, server.getPacketRouter()); // Load default configuration for leaf nodes leafDefaultConfiguration = PubSubPersistenceManager.loadDefaultConfiguration(this, true); if (leafDefaultConfiguration == null) { // Create and save default configuration for leaf nodes; leafDefaultConfiguration = new DefaultNodeConfiguration(true); leafDefaultConfiguration.setAccessModel(AccessModel.open); leafDefaultConfiguration.setPublisherModel(PublisherModel.publishers); leafDefaultConfiguration.setDeliverPayloads(true); leafDefaultConfiguration.setLanguage("English"); leafDefaultConfiguration.setMaxPayloadSize(5120); leafDefaultConfiguration.setNotifyConfigChanges(true); leafDefaultConfiguration.setNotifyDelete(true); leafDefaultConfiguration.setNotifyRetract(true); leafDefaultConfiguration.setPersistPublishedItems(false); leafDefaultConfiguration.setMaxPublishedItems(-1); leafDefaultConfiguration.setPresenceBasedDelivery(false); leafDefaultConfiguration.setSendItemSubscribe(true); leafDefaultConfiguration.setSubscriptionEnabled(true); leafDefaultConfiguration.setReplyPolicy(null); PubSubPersistenceManager.createDefaultConfiguration(this, leafDefaultConfiguration); } // Load default configuration for collection nodes collectionDefaultConfiguration =
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?