📄 endpointdemuxlistener.java
字号:
this.listener = listener;
this.name = name;
synchronized(threadDispatcher) {
myAccount = threadDispatcher.newAccount(1, -1, this);
threadDispatcher.notify(); // makes a contender run earlier
}
Thread.yield(); // be nice, there's no hurry for us here.
}
class MessageFromSource {
Message msg;
ResourceAccount src;
MessageFromSource(Message msg, ResourceAccount src) {
this.msg = msg;
this.src = src;
}
}
/**
*
*/
public void unregistered() {
LinkedList oldQueue;
synchronized(threadDispatcher) {
keepGoing = false;
oldQueue = messageQueue;
messageQueue = new LinkedList();
threadDispatcher.notify(); // makes a contender run earlier
}
Thread.yield(); // be nice, there's no hurry for us now.
synchronized(messageDispatcher) {
// Explicitly retrieve and release each message in the queue
// so that the per-peer accounting is maintained.
int sz = oldQueue.size();
while (sz-- > 0) {
MessageFromSource mfs =
(MessageFromSource) oldQueue.removeFirst();
mfs.src.inNeed(false);
mfs.src.releaseQuantity(mfs.msg.getRawSize());
// Check src account idleness here. Idleness status is
// stable under messageDispatcher synchronization.
if (mfs.src.isIdle()) {
allSources.stickyCacheEntry(
(CacheEntry) mfs.src.getUserObject(),
false);
}
}
messageDispatcher.notify();
}
}
/**
* process one message and move on.
*/
public EndpointDemuxListener doOne()
{
MessageFromSource mfs;
// Dequeue a message and update the thread's account "need" status.
synchronized(threadDispatcher) {
mfs = deQueue();
myAccount.inNeed(messageQueue.size() != 0);
threadDispatcher.notify(); // makes a contender run earlier
}
// Release the resource to the message account and process.
// Msg can be null on occasions since we release the lock between
// picking a listener and dequeing...more than one thread
// could decide it has work, while there's a single
// message. Not too easy to avoid.
if (mfs != null) {
// We discount that message right now, because we have no idea
// what resources are going to be kept, freed, allocated in
// relation to that message or not, until the listener comes
// back. We cannot assume anything.
Message message;
synchronized(messageDispatcher) {
mfs.src.inNeed(false); // Make sure we won't get to keep it.
mfs.src.releaseQuantity(mfs.msg.getRawSize());
// Check idleness here. Idleness is stable under
// messageDispatcher synchronization.
if (mfs.src.isIdle()) {
allSources.stickyCacheEntry(
(CacheEntry) mfs.src.getUserObject(),
false);
}
message = mfs.msg;
messageDispatcher.notify();
}
try {
EndpointAddress srcAddr = message.getSourceAddress();
EndpointAddress dstAddr = message.getDestinationAddress();
listener.processIncomingMessage(message, srcAddr, dstAddr);
} catch (Throwable ignored) {
if (LOG.isEnabledFor(Priority.FATAL)) {
LOG.fatal("Uncaught Throwable in thread : " + Thread.currentThread().getName(), ignored);
}
}
}
ResourceAccount next;
synchronized(threadDispatcher) {
myAccount.inNeed(messageQueue.size() > 0);
next = myAccount.releaseItem();
if ((! keepGoing) && myAccount.isIdle()) {
// We have been laid off and it looks like all threads
// have returned. We can close the shop.
myAccount.close();
}
threadDispatcher.notify(); // makes a contender run earlier
}
if (next == null) return null;
return (EndpointDemuxListener) next.getUserObject();
}
class ListenerThread implements Runnable {
EndpointDemuxListener current;
ListenerThread(EndpointDemuxListener current) {
this.current = current;
new Thread(this).start();
}
public void run() {
while (current != null) {
current = current.doOne();
}
}
}
/**
* One new message arrived. Try to give a new thread for this
* message (this listener). Subsequently it will run other listeners.
* according to what the dispatcher says.
*/
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr)
{
if (! keepGoing) return;
ResourceAccount msgSrcAccount;
String srcAddrStr = srcAddr.toString();
synchronized(messageDispatcher) {
CacheEntry ce = allSources.getCacheEntry(srcAddrStr);
if (ce == null) {
// Cross-ref the cache entry as the cookie in the account.
// we'll need it to efficiently manipulate the purgeability
// of the cache entry. Each time we need the cache entry, it
// cost us a lookup. Rather do it just once.
// At first the user object in the account is null.
// We change it when we know what to set.
msgSrcAccount = (ResourceAccount)
messageDispatcher.newAccount(1, -1, null);
if (msgSrcAccount.getNbReserved() < 1) {
// That's bad ! We must get rid of some stale
// accounts. Purge 1/10 of the idle accounts.
msgSrcAccount.close();
allSources.purge(10);
msgSrcAccount = (ResourceAccount)
messageDispatcher.newAccount(1, -1, null);
}
allSources.put(srcAddrStr, msgSrcAccount);
ce = allSources.getCacheEntry(srcAddrStr);
msgSrcAccount.setUserObject(ce);
} else {
msgSrcAccount = (ResourceAccount) ce.getValue();
}
if (! msgSrcAccount.obtainQuantity(message.getRawSize())) {
// Too many backloged messages from there.
// discard right away.
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("Peer exceeds queuing limits; msg discarded.");
}
} else {
// Now, we hold a message resource for that source, so it
// cannot be purged from the cache.
allSources.stickyCacheEntry(ce, true);
}
messageDispatcher.notify();
}
message.setSourceAddress(srcAddr);
message.setDestinationAddress(dstAddr);
boolean obtained;
synchronized(threadDispatcher) {
enQueue(new MessageFromSource(message, msgSrcAccount));
obtained = myAccount.obtainItem();
threadDispatcher.notify(); // makes a contender run earlier
}
Thread.yield(); // be nice, we're one pushing data into the system
if (obtained) {
new ListenerThread(this);
} else {
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("Listener exceeds threads limits; msg waits.");
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -