⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 threadpool.c

📁 电驴下载工具eMule0.47aVeryCD的源代码,可作分析测试也可用于P2P软件的开发研究.
💻 C
📖 第 1 页 / 共 4 页
字号:
        } else {

            tp->persistentJob = NULL;
            tp->lastJobId = 0;
            tp->shutdown = 0;
            tp->totalThreads = 0;
            tp->persistentThreads = 0;

            for( i = 0; i < tp->attr.minThreads; i++ ) {

                if( ( retCode = CreateWorker( tp ) ) != 0 ) {
                    break;
                }
            }
        }

        ithread_mutex_unlock( &tp->mutex );

        if( retCode != 0 ) {
            //clean up if the min threads could
            //not be created
            ThreadPoolShutdown( tp );
        }

        return retCode;
    }

/****************************************************************************
 * Function: ThreadPoolAddPersistent
 *
 *  Description:
 *      Adds a long term job to the thread pool.
 *      Job will be run as soon as possible.
 *      Call will block until job is scheduled.
 *  Parameters:
 *      tp - valid thread pool pointer
 *      job-> valid ThreadPoolJob pointer with following fields
 *          func - ThreadFunction to run
 *          arg - argument to function.
 *          priority - priority of job.
 *          free_function - function to use when freeing argument
 *  Returns:
 *      0 on success, nonzero on failure
 *      EOUTOFMEM not enough memory to add job.
 *      EMAXTHREADS not enough threads to add persistent job.
 *****************************************************************************/
    int ThreadPoolAddPersistent( ThreadPool * tp,
                                 ThreadPoolJob * job,
                                 int *jobId ) {
        int tempId = -1;

        ThreadPoolJob *temp = NULL;

        assert( tp != NULL );
        assert( job != NULL );

        if( ( tp == NULL ) || ( job == NULL ) ) {
            return EINVAL;
        }

        if( jobId == NULL )
            jobId = &tempId;

        ( *jobId ) = INVALID_JOB_ID;

        ithread_mutex_lock( &tp->mutex );

        assert( ( job->priority == LOW_PRIORITY )
                || ( job->priority == MED_PRIORITY )
                || ( job->priority == HIGH_PRIORITY ) );

        //Create A worker if less than max threads running
        if( tp->totalThreads < tp->attr.maxThreads ) {
            CreateWorker( tp ); // MTY this gets stuck
        } else {
            //if there is more than one worker thread
            //available then schedule job, otherwise fail
            if( ( tp->totalThreads - tp->persistentThreads ) - 1 == 0 ) {
                ithread_mutex_unlock( &tp->mutex );
                return EMAXTHREADS;
            }
        }

        temp = CreateThreadPoolJob( job, tp->lastJobId, tp );

        if( temp == NULL ) {
            ithread_mutex_unlock( &tp->mutex );
            return EOUTOFMEM;
        }

        tp->persistentJob = temp;

        //Notify a waiting thread

        ithread_cond_signal( &tp->condition );

        //wait until long job has been picked up
        while( tp->persistentJob != NULL ) {
            ithread_cond_wait( &tp->start_and_shutdown, &tp->mutex );
        }

        ( *jobId ) = tp->lastJobId++;
        ithread_mutex_unlock( &tp->mutex );
        return 0;
    }

/****************************************************************************
 * Function: ThreadPoolAdd
 *
 *  Description:
 *      Adds a job to the thread pool.
 *      Job will be run as soon as possible.
 *  Parameters:
 *      tp - valid thread pool pointer
 *      func - ThreadFunction to run
 *      arg - argument to function.
 *      priority - priority of job.
 *      jobId - id of job
 *      duration - whether or not this is a persistent thread
 *      free_function - function to use when freeing argument
 *  Returns:
 *      0 on success, nonzero on failure
 *      EOUTOFMEM if not enough memory to add job.
 *****************************************************************************/
    int ThreadPoolAdd( ThreadPool * tp,
                       ThreadPoolJob * job,
                       int *jobId ) {
        int rc = EOUTOFMEM;

        int tempId = -1;

        ThreadPoolJob *temp = NULL;

        assert( tp != NULL );
        assert( job != NULL );

        if( ( tp == NULL ) || ( job == NULL ) ) {
            return EINVAL;
        }

        ithread_mutex_lock( &tp->mutex );

        assert( ( job->priority == LOW_PRIORITY )
                || ( job->priority == MED_PRIORITY )
                || ( job->priority == HIGH_PRIORITY ) );

        if( jobId == NULL )
            jobId = &tempId;

        ( *jobId ) = INVALID_JOB_ID;

        temp = CreateThreadPoolJob( job, tp->lastJobId, tp );

        if( temp == NULL ) {
            ithread_mutex_unlock( &tp->mutex );
            return rc;
        }

        if( job->priority == HIGH_PRIORITY ) {
            if( ListAddTail( &tp->highJobQ, temp ) )
                rc = 0;
        } else if( job->priority == MED_PRIORITY ) {
            if( ListAddTail( &tp->medJobQ, temp ) )
                rc = 0;
        } else {
            if( ListAddTail( &tp->lowJobQ, temp ) )
                rc = 0;
        }

        //AddWorker if appropriate
        AddWorker( tp );

        //Notify a waiting thread
        if( rc == 0 ) {
            ithread_cond_signal( &tp->condition );

        } else {
            FreeThreadPoolJob( tp, temp );
        }

        ( *jobId ) = tp->lastJobId++;

        ithread_mutex_unlock( &tp->mutex );
        return rc;
    }

/****************************************************************************
 * Function: ThreadPoolRemove
 *
 *  Description:
 *      Removes a job from the thread pool.
 *      Can only remove jobs which are not
 *      currently running.
 *  Parameters:
 *      tp - valid thread pool pointer
 *      jobId - id of job
 *      ThreadPoolJob *out - space for removed job.
 *                           Can be null if not needed.
 *
 *  Returns:
 *      0 on success. INVALID_JOB_ID on failure.
 *****************************************************************************/
    int ThreadPoolRemove( ThreadPool * tp,
                          int jobId,
                          ThreadPoolJob * out ) {
        ThreadPoolJob *temp = NULL;
        int ret = INVALID_JOB_ID;
        ListNode *tempNode = NULL;
        ThreadPoolJob dummy;

        assert( tp != NULL );

        if( tp == NULL ) {
            return EINVAL;
        }

        if( out == NULL ) {
            out = &dummy;
        }

        dummy.jobId = jobId;

        ithread_mutex_lock( &tp->mutex );

        tempNode = ListFind( &tp->highJobQ, NULL, &dummy );

        if( tempNode ) {
            temp = ( ThreadPoolJob * ) tempNode->item;
            ( *out ) = ( *temp );
            ListDelNode( &tp->highJobQ, tempNode, 0 );
            FreeThreadPoolJob( tp, temp );
            ithread_mutex_unlock( &tp->mutex );
            return 0;
        }

        tempNode = ListFind( &tp->medJobQ, NULL, &dummy );

        if( tempNode ) {
            temp = ( ThreadPoolJob * ) tempNode->item;
            ( *out ) = ( *temp );
            ListDelNode( &tp->medJobQ, tempNode, 0 );
            FreeThreadPoolJob( tp, temp );
            ithread_mutex_unlock( &tp->mutex );
            return 0;
        }

        tempNode = ListFind( &tp->lowJobQ, NULL, &dummy );

        if( tempNode ) {
            temp = ( ThreadPoolJob * ) tempNode->item;
            ( *out ) = ( *temp );
            ListDelNode( &tp->lowJobQ, tempNode, 0 );
            FreeThreadPoolJob( tp, temp );
            ithread_mutex_unlock( &tp->mutex );
            return 0;
        }

        if( ( tp->persistentJob )
            && ( tp->persistentJob->jobId == jobId ) ) {
            ( *out ) = ( *tp->persistentJob );
            FreeThreadPoolJob( tp, tp->persistentJob );
            tp->persistentJob = NULL;
            ithread_mutex_unlock( &tp->mutex );
            return 0;
        }

        ithread_mutex_unlock( &tp->mutex );
        return ret;
    }

/****************************************************************************
 * Function: ThreadPoolGetAttr
 *
 *  Description:
 *      Gets the current set of attributes
 *      associated with the thread pool.
 *  Parameters:
 *      tp - valid thread pool pointer
 *      out - non null pointer to store attributes
 *  Returns:
 *      0 on success, nonzero on failure
 *      Always returns 0.
 *****************************************************************************/
    int ThreadPoolGetAttr( ThreadPool * tp,
                           ThreadPoolAttr * out ) {
        assert( tp != NULL );

        assert( out != NULL );

        if( ( tp == NULL ) || ( out == NULL ) ) {
            return EINVAL;
        }

        if( !tp->shutdown ) {
            ithread_mutex_lock( &tp->mutex );
        }

        ( *out ) = tp->attr;

        if( !tp->shutdown ) {
            ithread_mutex_unlock( &tp->mutex );
        }

        return 0;
    }

/****************************************************************************
 * Function: ThreadPoolSetAttr
 *
 *  Description:
 *      Sets the attributes for the thread pool.
 *      Only affects future calculations.
 *  Parameters:
 *      tp - valid thread pool pointer
 *      attr - pointer to attributes, null sets attributes to default.
 *  Returns:
 *      0 on success, nonzero on failure
 *      Returns INVALID_POLICY if policy can not be set.
 *****************************************************************************/
    int ThreadPoolSetAttr( ThreadPool * tp,
                           ThreadPoolAttr * attr ) {
        int retCode = 0;
        ThreadPoolAttr temp;
        int i = 0;

        assert( tp != NULL );

        if( tp == NULL ) {
            return EINVAL;
        }
        ithread_mutex_lock( &tp->mutex );

        if( attr != NULL ) {
            temp = ( *attr );
        } else {
            TPAttrInit( &temp );
        }

        if( SetPolicyType( temp.schedPolicy ) != 0 ) {
            ithread_mutex_unlock( &tp->mutex );
            return INVALID_POLICY;
        }

        tp->attr = ( temp );

        if( tp->totalThreads < tp->attr.minThreads )    //add threads
        {
            for( i = tp->totalThreads; i < tp->attr.minThreads; i++ ) {

                if( ( retCode = CreateWorker( tp ) ) != 0 ) {
                    break;
                }
            }
        }

        ithread_cond_signal( &tp->condition );  //signal changes 

        ithread_mutex_unlock( &tp->mutex );

        if( retCode != 0 ) {
            //clean up if the min threads could
            //not be created
            ThreadPoolShutdown( tp );
        }

        return retCode;
    }

/****************************************************************************
 * Function: ThreadPoolShutdown
 *
 *  Description:
 *      Shuts the thread pool down.
 *      Waits for all threads to finish.
 *      May block indefinitely if jobs do not
 *      exit.
 *  Parameters:
 *      tp - must be valid tp
 *  Returns:
 *      0 on success, nonzero on failure
 *      Always returns 0.
 *****************************************************************************/
    int ThreadPoolShutdown( ThreadPool * tp ) {

        ListNode *head = NULL;
        ThreadPoolJob *temp = NULL;

        assert( tp != NULL );

        if( tp == NULL ) {
            return EINVAL;
        }

        ithread_mutex_lock( &tp->mutex );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -