⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 endpointdemuxlistener.java

📁 jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,程序编译需要Ant支持
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	this.listener = listener;
	this.name = name;
	
	synchronized(threadDispatcher) {
	    myAccount = threadDispatcher.newAccount(1, -1, this);
	    threadDispatcher.notify(); // makes a contender run earlier
	}
	Thread.yield(); // be nice, there's no hurry for us here.
    }

    class MessageFromSource {
	Message msg;
	ResourceAccount src;
	MessageFromSource(Message msg, ResourceAccount src) {
	    this.msg = msg;
	    this.src = src;
	}
    }

    /**
     *  
     */
    public void unregistered() {
	LinkedList oldQueue;
	synchronized(threadDispatcher) {
	    keepGoing = false;
	    oldQueue = messageQueue;
	    messageQueue = new LinkedList();
	    threadDispatcher.notify(); // makes a contender run earlier
	}
	Thread.yield(); // be nice, there's no hurry for us now.

	synchronized(messageDispatcher) {

	    // Explicitly retrieve and release each message in the queue
	    // so that the per-peer accounting is maintained.
	    int sz = oldQueue.size();
	    while (sz-- > 0) {
		MessageFromSource mfs =
		    (MessageFromSource) oldQueue.removeFirst();
		mfs.src.inNeed(false);
		mfs.src.releaseQuantity(mfs.msg.getRawSize());
		// Check src account idleness here. Idleness status is
		// stable under messageDispatcher synchronization.
		if (mfs.src.isIdle()) {
		    allSources.stickyCacheEntry(
			       (CacheEntry) mfs.src.getUserObject(),
			       false);
		}
	    }
	    messageDispatcher.notify();
	}
    }
    
    /**
     * process one message and move on.
     */
    public EndpointDemuxListener doOne()
    {
	MessageFromSource mfs;

	// Dequeue a message and update the thread's account "need" status.
	synchronized(threadDispatcher) {
	    mfs = deQueue();
	    myAccount.inNeed(messageQueue.size() != 0);
	    threadDispatcher.notify(); // makes a contender run earlier
	}
	
	// Release the resource to the message account and process.

	// Msg can be null on occasions since we release the lock between
	// picking a listener and dequeing...more than one thread
	// could decide it has work, while there's a single
	// message. Not too easy to avoid.
	if (mfs != null) {
	    // We discount that message right now, because we have no idea
	    // what resources are going to be kept, freed, allocated in
	    // relation to that message or not, until the listener comes
	    // back. We cannot assume anything.
	    Message message;
	    synchronized(messageDispatcher) {
		mfs.src.inNeed(false); // Make sure we won't get to keep it.
		mfs.src.releaseQuantity(mfs.msg.getRawSize());
		// Check idleness here. Idleness is stable under
		// messageDispatcher synchronization.
		if (mfs.src.isIdle()) {
		    allSources.stickyCacheEntry(
			       (CacheEntry) mfs.src.getUserObject(),
			       false);
		}
		message = mfs.msg;
		messageDispatcher.notify();
	    }

	    try {
		EndpointAddress srcAddr = message.getSourceAddress();
		EndpointAddress dstAddr = message.getDestinationAddress();
		listener.processIncomingMessage(message, srcAddr, dstAddr);
	    } catch (Throwable ignored) {
		if (LOG.isEnabledFor(Priority.FATAL)) {
		    LOG.fatal("Uncaught Throwable in thread : " + Thread.currentThread().getName(), ignored);
		}
	    }
	}

	ResourceAccount next;
	synchronized(threadDispatcher) {
	    myAccount.inNeed(messageQueue.size() > 0);
	    next = myAccount.releaseItem();
	    if ((! keepGoing) && myAccount.isIdle()) {

		// We have been laid off and it looks like all threads
		// have returned. We can close the shop.
		myAccount.close();
	    }
	    threadDispatcher.notify(); // makes a contender run earlier
	}

	if (next == null) return null;
	return (EndpointDemuxListener) next.getUserObject();
    }
    
    class ListenerThread implements Runnable {
	EndpointDemuxListener current;
	ListenerThread(EndpointDemuxListener current) {
	    this.current = current;
	    new Thread(this).start();
	}
	public void run() {
	    while (current != null) {
		current = current.doOne();
	    }
	}
    }

    /**
     * One new message arrived. Try to give a new thread for this
     * message (this listener). Subsequently it will run other listeners.
     * according to what the dispatcher says.
     */
    public void processIncomingMessage(Message message,
				       EndpointAddress srcAddr,
				       EndpointAddress dstAddr)
    {
	if (! keepGoing) return;

	ResourceAccount msgSrcAccount;
	String srcAddrStr = srcAddr.toString();

	synchronized(messageDispatcher) {

	    CacheEntry ce = allSources.getCacheEntry(srcAddrStr);

	    if (ce == null) {
		// Cross-ref the cache entry as the cookie in the account.
		// we'll need it to efficiently manipulate the purgeability
		// of the cache entry. Each time we need the cache entry, it
		// cost us a lookup. Rather do it just once.

		// At first the user object in the account is null.
		// We change it when we know what to set.
		msgSrcAccount = (ResourceAccount)
		    messageDispatcher.newAccount(1, -1, null);

		if (msgSrcAccount.getNbReserved() < 1) {
		    // That's bad ! We must get rid of some stale
		    // accounts. Purge 1/10 of the idle accounts.
		    msgSrcAccount.close();
		    allSources.purge(10);
		    msgSrcAccount = (ResourceAccount)
			messageDispatcher.newAccount(1, -1, null);
		}

		allSources.put(srcAddrStr, msgSrcAccount);

		ce = allSources.getCacheEntry(srcAddrStr);
		msgSrcAccount.setUserObject(ce);
	    } else {
		msgSrcAccount = (ResourceAccount) ce.getValue();
	    }

	    if (! msgSrcAccount.obtainQuantity(message.getRawSize())) {
		// Too many backloged messages from there.
		// discard right away.
		if (LOG.isEnabledFor(Priority.INFO)) {
		    LOG.info("Peer exceeds queuing limits; msg discarded.");
		}
	    } else {
		// Now, we hold a message resource for that source, so it
		// cannot be purged from the cache.
		allSources.stickyCacheEntry(ce, true);
	    }
	    messageDispatcher.notify();
	}

	message.setSourceAddress(srcAddr);
	message.setDestinationAddress(dstAddr);

	boolean obtained;
	synchronized(threadDispatcher) {
	    enQueue(new MessageFromSource(message, msgSrcAccount));
	    obtained = myAccount.obtainItem();
	    threadDispatcher.notify();  // makes a contender run earlier
	}

	Thread.yield(); // be nice, we're one pushing data into the system

	if (obtained) {
	    new ListenerThread(this);
	} else {
	    if (LOG.isEnabledFor(Priority.INFO)) {
		LOG.info("Listener exceeds threads limits; msg waits.");
	    }
	}
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -