📄 prmwait.c
字号:
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- *//* * The contents of this file are subject to the Mozilla Public * License Version 1.1 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of * the License at http://www.mozilla.org/MPL/ * * Software distributed under the License is distributed on an "AS * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or * implied. See the License for the specific language governing * rights and limitations under the License. * * The Original Code is the Netscape Portable Runtime (NSPR). * * The Initial Developer of the Original Code is Netscape * Communications Corporation. Portions created by Netscape are * Copyright (C) 1998-2000 Netscape Communications Corporation. All * Rights Reserved. * * Contributor(s): * * Alternatively, the contents of this file may be used under the * terms of the GNU General Public License Version 2 or later (the * "GPL"), in which case the provisions of the GPL are applicable * instead of those above. If you wish to allow use of your * version of this file only under the terms of the GPL and not to * allow others to use your version of this file under the MPL, * indicate your decision by deleting the provisions above and * replace them with the notice and other provisions required by * the GPL. If you do not delete the provisions above, a recipient * may use your version of this file under either the MPL or the * GPL. */#include "primpl.h"#include "pprmwait.h"#define _MW_REHASH_MAX 11static PRLock *mw_lock = NULL;static _PRGlobalState *mw_state = NULL;static PRIntervalTime max_polling_interval;#ifdef WINNTtypedef struct TimerEvent { PRIntervalTime absolute; void (*func)(void *); void *arg; LONG ref_count; PRCList links;} TimerEvent;#define TIMER_EVENT_PTR(_qp) \ ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))struct { PRLock *ml; PRCondVar *new_timer; PRCondVar *cancel_timer; PRThread *manager_thread; PRCList timer_queue;} tm_vars;static PRStatus TimerInit(void);static void TimerManager(void *arg);static TimerEvent *CreateTimer(PRIntervalTime timeout, void (*func)(void *), void *arg);static PRBool CancelTimer(TimerEvent *timer);static void TimerManager(void *arg){ PRIntervalTime now; PRIntervalTime timeout; PRCList *head; TimerEvent *timer; PR_Lock(tm_vars.ml); while (1) { if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) { PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT); } else { now = PR_IntervalNow(); head = PR_LIST_HEAD(&tm_vars.timer_queue); timer = TIMER_EVENT_PTR(head); if ((PRInt32) (now - timer->absolute) >= 0) { PR_REMOVE_LINK(head); /* * make its prev and next point to itself so that * it's obvious that it's not on the timer_queue. */ PR_INIT_CLIST(head); PR_ASSERT(2 == timer->ref_count); PR_Unlock(tm_vars.ml); timer->func(timer->arg); PR_Lock(tm_vars.ml); timer->ref_count -= 1; if (0 == timer->ref_count) { PR_NotifyAllCondVar(tm_vars.cancel_timer); } } else { timeout = (PRIntervalTime)(timer->absolute - now); PR_WaitCondVar(tm_vars.new_timer, timeout); } } } PR_Unlock(tm_vars.ml);}static TimerEvent *CreateTimer( PRIntervalTime timeout, void (*func)(void *), void *arg){ TimerEvent *timer; PRCList *links, *tail; TimerEvent *elem; timer = PR_NEW(TimerEvent); if (NULL == timer) { PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return timer; } timer->absolute = PR_IntervalNow() + timeout; timer->func = func; timer->arg = arg; timer->ref_count = 2; PR_Lock(tm_vars.ml); tail = links = PR_LIST_TAIL(&tm_vars.timer_queue); while (links->prev != tail) { elem = TIMER_EVENT_PTR(links); if ((PRInt32)(timer->absolute - elem->absolute) >= 0) { break; } links = links->prev; } PR_INSERT_AFTER(&timer->links, links); PR_NotifyCondVar(tm_vars.new_timer); PR_Unlock(tm_vars.ml); return timer;}static PRBool CancelTimer(TimerEvent *timer){ PRBool canceled = PR_FALSE; PR_Lock(tm_vars.ml); timer->ref_count -= 1; if (timer->links.prev == &timer->links) { while (timer->ref_count == 1) { PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT); } } else { PR_REMOVE_LINK(&timer->links); canceled = PR_TRUE; } PR_Unlock(tm_vars.ml); PR_DELETE(timer); return canceled; }static PRStatus TimerInit(void){ tm_vars.ml = PR_NewLock(); if (NULL == tm_vars.ml) { goto failed; } tm_vars.new_timer = PR_NewCondVar(tm_vars.ml); if (NULL == tm_vars.new_timer) { goto failed; } tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml); if (NULL == tm_vars.cancel_timer) { goto failed; } PR_INIT_CLIST(&tm_vars.timer_queue); tm_vars.manager_thread = PR_CreateThread( PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL, PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0); if (NULL == tm_vars.manager_thread) { goto failed; } return PR_SUCCESS;failed: if (NULL != tm_vars.cancel_timer) { PR_DestroyCondVar(tm_vars.cancel_timer); } if (NULL != tm_vars.new_timer) { PR_DestroyCondVar(tm_vars.new_timer); } if (NULL != tm_vars.ml) { PR_DestroyLock(tm_vars.ml); } return PR_FAILURE;}#endif /* WINNT *//******************************************************************//******************************************************************//************************ The private portion *********************//******************************************************************//******************************************************************/void _PR_InitMW(void){#ifdef WINNT /* * We use NT 4's InterlockedCompareExchange() to operate * on PRMWStatus variables. */ PR_ASSERT(sizeof(PVOID) == sizeof(PRMWStatus)); TimerInit();#endif mw_lock = PR_NewLock(); PR_ASSERT(NULL != mw_lock); mw_state = PR_NEWZAP(_PRGlobalState); PR_ASSERT(NULL != mw_state); PR_INIT_CLIST(&mw_state->group_list); max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);} /* _PR_InitMW */void _PR_CleanupMW(void){ PR_DestroyLock(mw_lock); mw_lock = NULL; if (mw_state->group) { PR_DestroyWaitGroup(mw_state->group); /* mw_state->group is set to NULL as a side effect. */ } PR_DELETE(mw_state);} /* _PR_CleanupMW */static PRWaitGroup *MW_Init2(void){ PRWaitGroup *group = mw_state->group; /* it's the null group */ if (NULL == group) /* there is this special case */ { group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH); if (NULL == group) goto failed_alloc; PR_Lock(mw_lock); if (NULL == mw_state->group) { mw_state->group = group; group = NULL; } PR_Unlock(mw_lock); if (group != NULL) (void)PR_DestroyWaitGroup(group); group = mw_state->group; /* somebody beat us to it */ }failed_alloc: return group; /* whatever */} /* MW_Init2 */static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash){ /* ** The entries are put in the table using the fd (PRFileDesc*) of ** the receive descriptor as the key. This allows us to locate ** the appropriate entry aqain when the poll operation finishes. ** ** The pointer to the file descriptor object is first divided by ** the natural alignment of a pointer in the belief that object ** will have at least that many zeros in the low order bits. ** This may not be a good assuption. ** ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After ** that we declare defeat and force the table to be reconstructed. ** Since some fds might be added more than once, won't that cause ** collisions even in an empty table? */ PRIntn rehash = _MW_REHASH_MAX; PRRecvWait **waiter; PRUintn hidx = _MW_HASH(desc->fd, hash->length); PRUintn hoffset = 0; while (rehash-- > 0) { waiter = &hash->recv_wait; if (NULL == waiter[hidx]) { waiter[hidx] = desc; hash->count += 1;#if 0 printf("Adding 0x%x->0x%x ", desc, desc->fd); printf( "table[%u:%u:*%u]: 0x%x->0x%x\n", hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);#endif return _prmw_success; } if (desc == waiter[hidx]) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */ return _prmw_error; }#if 0 printf("Failing 0x%x->0x%x ", desc, desc->fd); printf( "table[*%u:%u:%u]: 0x%x->0x%x\n", hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);#endif if (0 == hoffset) { hoffset = _MW_HASH2(desc->fd, hash->length); PR_ASSERT(0 != hoffset); } hidx = (hidx + hoffset) % (hash->length); } return _prmw_rehash; } /* MW_AddHashInternal */static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group){ PRRecvWait **desc; PRUint32 pidx, length; _PRWaiterHash *newHash, *oldHash = group->waiter; PRBool retry; _PR_HashStory hrv; static const PRInt32 prime_number[] = { _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427, 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771}; PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32)); /* look up the next size we'd like to use for the hash table */ for (pidx = 0; pidx < primes; ++pidx) { if (prime_number[pidx] == oldHash->length) { break; } } /* table size must be one of the prime numbers */ PR_ASSERT(pidx < primes); /* if pidx == primes - 1, we can't expand the table any more */ while (pidx < primes - 1) { /* next size */ ++pidx; length = prime_number[pidx]; /* allocate the new hash table and fill it in with the old */ newHash = (_PRWaiterHash*)PR_CALLOC( sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*))); if (NULL == newHash) { PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return _prmw_error; } newHash->length = length; retry = PR_FALSE; for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) { PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length); if (NULL != *desc) { hrv = MW_AddHashInternal(*desc, newHash); PR_ASSERT(_prmw_error != hrv); if (_prmw_success != hrv) { PR_DELETE(newHash); retry = PR_TRUE; break; } } } if (retry) continue; PR_DELETE(group->waiter); group->waiter = newHash; group->p_timestamp += 1; return _prmw_success; } PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); return _prmw_error; /* we're hosed */} /* MW_ExpandHashInternal */#ifndef WINNTstatic void _MW_DoneInternal( PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome){ /* ** Add this receive wait object to the list of finished I/O ** operations for this particular group. If there are other ** threads waiting on the group, notify one. If not, arrange ** for this thread to return. */#if 0 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);#endif (*waiter)->outcome = outcome; PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready); PR_NotifyCondVar(group->io_complete); PR_ASSERT(0 != group->waiter->count); group->waiter->count -= 1; *waiter = NULL;} /* _MW_DoneInternal */#endif /* WINNT */static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd){ /* ** Find the receive wait object corresponding to the file descriptor. ** Only search the wait group specified. */ PRRecvWait **desc; PRIntn rehash = _MW_REHASH_MAX; _PRWaiterHash *hash = group->waiter; PRUintn hidx = _MW_HASH(fd, hash->length); PRUintn hoffset = 0; while (rehash-- > 0) { desc = (&hash->recv_wait) + hidx; if ((*desc != NULL) && ((*desc)->fd == fd)) return desc; if (0 == hoffset) { hoffset = _MW_HASH2(fd, hash->length); PR_ASSERT(0 != hoffset); } hidx = (hidx + hoffset) % (hash->length); } return NULL;} /* _MW_LookupInternal */#ifndef WINNTstatic PRStatus _MW_PollInternal(PRWaitGroup *group){ PRRecvWait **waiter; PRStatus rv = PR_FAILURE; PRInt32 count, count_ready; PRIntervalTime polling_interval; group->poller = PR_GetCurrentThread(); while (PR_TRUE) { PRIntervalTime now, since_last_poll; PRPollDesc *poll_list; while (0 == group->waiter->count) { PRStatus st; st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT); if (_prmw_running != group->state) { PR_SetError(PR_INVALID_STATE_ERROR, 0); goto aborted; } if (_MW_ABORTED(st)) goto aborted; } /* ** There's something to do. See if our existing polling list ** is large enough for what we have to do? */ while (group->polling_count < group->waiter->count) { PRUint32 old_count = group->waiter->count; PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE); PRSize new_size = sizeof(PRPollDesc) * new_count; PRPollDesc *old_polling_list = group->polling_list; PR_Unlock(group->ml); poll_list = (PRPollDesc*)PR_CALLOC(new_size); if (NULL == poll_list) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -