📄 routingtableimpl.java
字号:
/**
* $RCSfile: RoutingTableImpl.java,v $
* $Revision: 3138 $
* $Date: 2005-12-01 02:13:26 -0300 (Thu, 01 Dec 2005) $
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution, or a commercial license
* agreement with Jive.
*/
package org.jivesoftware.openfire.spi;
import org.jivesoftware.openfire.*;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.cluster.ClusterEventListener;
import org.jivesoftware.openfire.cluster.ClusterManager;
import org.jivesoftware.openfire.cluster.NodeID;
import org.jivesoftware.openfire.container.BasicModule;
import org.jivesoftware.openfire.handler.PresenceUpdateHandler;
import org.jivesoftware.openfire.server.OutgoingSessionPromise;
import org.jivesoftware.openfire.session.*;
import org.jivesoftware.util.ConcurrentHashSet;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.cache.Cache;
import org.jivesoftware.util.cache.CacheFactory;
import org.xmpp.packet.*;
import java.util.*;
import java.util.concurrent.locks.Lock;
/**
* Routing table that stores routes to client sessions, outgoing server sessions
* and components. As soon as a user authenticates with the server its client session
* will be added to the routing table. Whenever the client session becomes available
* or unavailable the routing table will be updated too.<p>
*
* When running inside of a cluster the routing table will also keep references to routes
* hosted in other cluster nodes. A {@link RemotePacketRouter} will be use to route packets
* to routes hosted in other cluster nodes.<p>
*
* Failure to route a packet will end up sending {@link IQRouter#routingFailed(JID, Packet)},
* {@link MessageRouter#routingFailed(JID, Packet)} or {@link PresenceRouter#routingFailed(JID, Packet)}
* depending on the packet type that tried to be sent.
*
* @author Gaston Dombiak
*/
public class RoutingTableImpl extends BasicModule implements RoutingTable, ClusterEventListener {
public static final String C2S_CACHE_NAME = "Routing Users Cache";
public static final String ANONYMOUS_C2S_CACHE_NAME = "Routing AnonymousUsers Cache";
public static final String S2S_CACHE_NAME = "Routing Servers Cache";
public static final String COMPONENT_CACHE_NAME = "Routing Components Cache";
/**
* Cache (unlimited, never expire) that holds outgoing sessions to remote servers from this server.
* Key: server domain, Value: nodeID
*/
private Cache<String, byte[]> serversCache;
/**
* Cache (unlimited, never expire) that holds components connected to the server.
* Key: component domain, Value: list of nodeIDs hosting the component
*/
private Cache<String, Set<NodeID>> componentsCache;
/**
* Cache (unlimited, never expire) that holds sessions of user that have authenticated with the server.
* Key: full JID, Value: {nodeID, available/unavailable}
*/
private Cache<String, ClientRoute> usersCache;
/**
* Cache (unlimited, never expire) that holds sessions of anonymous user that have authenticated with the server.
* Key: full JID, Value: {nodeID, available/unavailable}
*/
private Cache<String, ClientRoute> anonymousUsersCache;
/**
* Cache (unlimited, never expire) that holds list of connected resources of authenticated users
* (includes anonymous).
* Key: bare JID, Value: list of full JIDs of the user
*/
private Cache<String, Collection<String>> usersSessions;
private String serverName;
private XMPPServer server;
private LocalRoutingTable localRoutingTable;
private RemotePacketRouter remotePacketRouter;
private IQRouter iqRouter;
private MessageRouter messageRouter;
private PresenceRouter presenceRouter;
private PresenceUpdateHandler presenceUpdateHandler;
public RoutingTableImpl() {
super("Routing table");
serversCache = CacheFactory.createCache(S2S_CACHE_NAME);
componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME);
usersCache = CacheFactory.createCache(C2S_CACHE_NAME);
anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME);
usersSessions = CacheFactory.createCache("Routing User Sessions");
localRoutingTable = new LocalRoutingTable();
}
public void addServerRoute(JID route, LocalOutgoingServerSession destination) {
String address = route.getDomain();
localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address, serversCache);
try {
lock.lock();
serversCache.put(address, server.getNodeID().toByteArray());
}
finally {
lock.unlock();
}
}
public void addComponentRoute(JID route, RoutableChannelHandler destination) {
String address = route.getDomain();
localRoutingTable.addRoute(address, destination);
Lock lock = CacheFactory.getLock(address, componentsCache);
try {
lock.lock();
Set<NodeID> nodes = componentsCache.get(address);
if (nodes == null) {
nodes = new HashSet<NodeID>();
}
nodes.add(server.getNodeID());
componentsCache.put(address, nodes);
} finally {
lock.unlock();
}
}
public boolean addClientRoute(JID route, LocalClientSession destination) {
boolean added;
boolean available = destination.getPresence().isAvailable();
localRoutingTable.addRoute(route.toString(), destination);
if (destination.getAuthToken().isAnonymous()) {
Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache);
try {
lockAn.lock();
added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) ==
null;
}
finally {
lockAn.unlock();
}
// Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) {
Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try {
lock.lock();
usersSessions.put(route.toBareJID(), Arrays.asList(route.toString()));
}
finally {
lock.unlock();
}
}
}
else {
Lock lockU = CacheFactory.getLock(route.toString(), usersCache);
try {
lockU.lock();
added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null;
}
finally {
lockU.unlock();
}
// Add the session to the list of user sessions
if (route.getResource() != null && (!available || added)) {
Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions);
try {
lock.lock();
Collection<String> jids = usersSessions.get(route.toBareJID());
if (jids == null) {
// Optimization - use different class depending on current setup
if (ClusterManager.isClusteringStarted()) {
jids = new HashSet<String>();
}
else {
jids = new ConcurrentHashSet<String>();
}
}
jids.add(route.toString());
usersSessions.put(route.toBareJID(), jids);
}
finally {
lock.unlock();
}
}
}
return added;
}
public void broadcastPacket(Message packet, boolean onlyLocal) {
// Send the message to client sessions connected to this JVM
for(ClientSession session : localRoutingTable.getClientRoutes()) {
session.process(packet);
}
// Check if we need to broadcast the message to client sessions connected to remote cluter nodes
if (!onlyLocal && remotePacketRouter != null) {
remotePacketRouter.broadcastPacket(packet);
}
}
public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
boolean routed = false;
if (serverName.equals(jid.getDomain())) {
if (jid.getResource() == null) {
// Packet sent to a bare JID of a user
if (packet instanceof Message) {
// Find best route of local user
routed = routeToBareJID(jid, (Message) packet);
}
else {
throw new PacketException("Cannot route packet of type IQ or Presence to bare JID: " + packet);
}
}
else {
// Packet sent to local user (full JID)
ClientRoute clientRoute = usersCache.get(jid.toString());
if (clientRoute == null) {
clientRoute = anonymousUsersCache.get(jid.toString());
}
if (clientRoute != null) {
if (!clientRoute.isAvailable() && routeOnlyAvailable(packet, fromServer) &&
!presenceUpdateHandler.hasDirectPresence(packet.getTo(), packet.getFrom())) {
// Packet should only be sent to available sessions and the route is not available
routed = false;
}
else {
if (server.getNodeID().equals(clientRoute.getNodeID())) {
// This is a route to a local user hosted in this node
try {
localRoutingTable.getRoute(jid.toString()).process(packet);
routed = true;
} catch (UnauthorizedException e) {
Log.error(e);
}
}
else {
// This is a route to a local user hosted in other node
if (remotePacketRouter != null) {
routed = remotePacketRouter
.routePacket(clientRoute.getNodeID().toByteArray(), jid, packet);
}
}
}
}
}
}
else if (jid.getDomain().contains(serverName) && hasComponentRoute(jid)) {
// Packet sent to component hosted in this server
// First check if the component is being hosted in this JVM
RoutableChannelHandler route = localRoutingTable.getRoute(jid.getDomain());
if (route != null) {
try {
route.process(packet);
routed = true;
} catch (UnauthorizedException e) {
Log.error(e);
}
}
else {
// Check if other cluster nodes are hosting this component
Set<NodeID> nodes = componentsCache.get(jid.getDomain());
if (nodes != null) {
for (NodeID nodeID : nodes) {
if (server.getNodeID().equals(nodeID)) {
// This is a route to a local component hosted in this node (route
// could have been added after our previous check)
try {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -