📄 hmmpfam.c
字号:
/* get their OK codes. */ PVMConfirmSlaves(slave_tid, nslaves); SQD_DPRINTF1(("Slaves confirm that they're ok...\n")); /* Load the slaves. * For efficiency reasons, we don't want the master to * load HMMs from disk until she absolutely needs them. */ for (nhmm = 0; nhmm < nslaves && nhmm < hmmfp->ssi->nprimary; nhmm++) { pvm_initsend(PvmDataDefault); pvm_pkint(&nhmm, 1, 1); /* side effect: also tells him what number he is. */ pvm_send(slave_tid[nhmm], HMMPVM_WORK); hmmlist[nhmm] = nhmm; } SQD_DPRINTF1(("%d slaves are loaded\n", nhmm)); /* Receive/send loop */ for (; nhmm < hmmfp->ssi->nprimary; nhmm++) { /* check slaves before blocking */ PVMCheckSlaves(slave_tid, nslaves); /* receive output */ SQD_DPRINTF1(("Waiting for a slave to give me output...\n")); pvm_recv(-1, HMMPVM_RESULTS); pvm_upkint(&slaveidx, 1, 1); /* # of slave who's sending us stuff */ pvm_upkstr(slavename); /* name of HMM that slave did */ pvm_upkfloat(&sc, 1, 1); /* score */ pvm_upkdouble(&pvalue, 1, 1); /* P-value */ pvm_upkint(&sent_trace, 1, 1); /* TRUE if trace is coming */ tr = (sent_trace) ? PVMUnpackTrace() : NULL; SQD_DPRINTF1(("Slave %d finished %s for me...\n", slaveidx, slavename)); /* send new work */ pvm_initsend(PvmDataDefault); pvm_pkint(&nhmm, 1, 1); pvm_send(slave_tid[slaveidx], HMMPVM_WORK); SQD_DPRINTF1(("Assigned %d -> slave %d\n", nhmm, slaveidx)); /* process output */ /* 1b. Store scores/pvalue for each HMM aligned to this sequence, overall */ SQD_DPRINTF1(("%15s : %2d : %f\n", slavename, slaveidx, sc)); if (sent_trace) { /* now load the HMM, because the hit is significant */ HMMFilePositionByIndex(hmmfp, hmmlist[slaveidx]); if (!HMMFileRead(hmmfp, &hmm)) { pvm_exit(); Die("Unexpected failure to read HMM file %s", hmmfile); } if (hmm == NULL) { pvm_exit(); Die("HMM file %s may be corrupt; parse failed", hmmfile); } P7Logoddsify(hmm, TRUE); if (! SetAutocuts(thresh, hmm)) Die("HMM %s did not contain your GA, NC, or TC cutoffs", hmm->name); PostprocessSignificantHit(ghit, dhit, tr, hmm, dsq, sqinfo->len, sqinfo->name, sqinfo->flags & SQINFO_ACC ? sqinfo->acc : NULL, sqinfo->flags & SQINFO_DESC ? sqinfo->desc : NULL, do_forward, sc, do_null2, thresh, TRUE); /* TRUE -> hmmpfam mode */ FreePlan7(hmm); P7FreeTrace(tr); } hmmlist[slaveidx] = nhmm; } /* Collect the output. all n slaves are still working, so wait for them. */ for (slave = 0; slave < nslaves && slave < nhmm; slave++) { /* don't check slaves (they're exiting normally); window of vulnerability here to slave crashes */ /* receive output */ pvm_recv(-1, HMMPVM_RESULTS); pvm_upkint(&slaveidx, 1, 1); /* slave who's sending us stuff */ pvm_upkstr(slavename); pvm_upkfloat(&sc, 1, 1); /* one score */ pvm_upkdouble(&pvalue, 1, 1); /* P-value */ pvm_upkint(&sent_trace, 1, 1); /* TRUE if trace is coming */ tr = (sent_trace) ? PVMUnpackTrace() : NULL; /* process output */ SQD_DPRINTF1(("%15s : %2d : %f\n", slavename, slaveidx, sc)); if (sent_trace) { /* now load the HMM, because the hit is significant */ HMMFilePositionByIndex(hmmfp, hmmlist[slaveidx]); if (!HMMFileRead(hmmfp, &hmm)) { pvm_exit(); Die("Unexpected failure to read HMM file %s", hmmfile);} if (hmm == NULL) { pvm_exit(); Die("HMM file %s may be corrupt; parse failed", hmmfile); } P7Logoddsify(hmm, TRUE); if (! SetAutocuts(thresh, hmm)) Die("HMM %s did not contain your GA, NC, or TC cutoffs", hmm->name); PostprocessSignificantHit(ghit, dhit, tr, hmm, dsq, sqinfo->len, sqinfo->name, NULL, NULL, /* won't need acc or desc even if we have 'em */ do_forward, sc, do_null2, thresh, TRUE); /* TRUE -> hmmpfam mode */ FreePlan7(hmm); P7FreeTrace(tr); } /* send cleanup/shutdown flag */ pvm_initsend(PvmDataDefault); msg = -1; pvm_pkint(&msg, 1, 1); pvm_send(slave_tid[slaveidx], HMMPVM_WORK); } /* Cleanup; quit the VM; and return */ free(slave_tid); free(hmmlist); free(dsq); pvm_exit(); *ret_nhmm = nhmm; return;}#endif /*HMMER_PVM*/#ifdef HMMER_THREADS/***************************************************************** * POSIX threads implementation. * * API: * workpool_start() (makes a workpool_s structure. Starts calculations.) * workpool_stop() (waits for threads to finish.) * workpool_free() (destroys the structure) * * Threads: * worker_thread() (the actual parallelized worker thread). *****************************************************************//* Function: workpool_start() * Date: SRE, Mon Sep 28 11:10:58 1998 [St. Louis] * * Purpose: Initialize a workpool_s structure, and return it. * * Args: hmmfile - name of HMM file * hmmfp - open HMM file, at start * dsq - ptr to sequence to search * seqname - ptr to name of dsq * L - length of dsq * do_forward - TRUE to score using Forward * do_null2 - TRUE to apply null2 ad hoc correction * threshold - evalue/score threshold settings * ghit - per-seq hit list * dhit - per-domain hit list * num_threads- number of worker threads to run. * * Returns: ptr to struct workpool_s. * Caller must wait for threads to finish with workpool_stop(), * then free the structure with workpool_free(). */static struct workpool_s *workpool_start(char *hmmfile, HMMFILE *hmmfp, char *dsq, char *seqname, int L, int do_forward, int do_null2, struct threshold_s *thresh, struct tophit_s *ghit, struct tophit_s *dhit, int num_threads){ struct workpool_s *wpool; pthread_attr_t attr; int i; int rtn; wpool = MallocOrDie(sizeof(struct workpool_s)); wpool->thread = MallocOrDie(num_threads * sizeof(pthread_t)); wpool->hmmfile = hmmfile; wpool->dsq = dsq; wpool->L = L; wpool->seqname = seqname; wpool->do_forward = do_forward; wpool->do_null2 = do_null2; wpool->thresh = thresh; wpool->hmmfp = hmmfp; wpool->nhmm = 0; if ((rtn = pthread_mutex_init(&(wpool->input_lock), NULL)) != 0) Die("pthread_mutex_init FAILED; %s\n", strerror(rtn)); wpool->ghit = ghit; wpool->dhit = dhit; if ((rtn = pthread_mutex_init(&(wpool->output_lock), NULL)) != 0) Die("pthread_mutex_init FAILED; %s\n", strerror(rtn)); wpool->num_threads= num_threads; /* Create slave threads. See comments in hmmcalibrate.c at * this step regarding concurrency and system scope. */ pthread_attr_init(&attr);#ifndef __sgi#ifdef HAVE_PTHREAD_ATTR_SETSCOPE pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);#endif#endif#ifdef HAVE_PTHREAD_SETCONCURRENCY pthread_setconcurrency(num_threads+1);#endif for (i = 0; i < num_threads; i++) if ((rtn = pthread_create(&(wpool->thread[i]), &attr, worker_thread , (void *) wpool)) != 0) Die("Failed to create thread %d; return code %d\n", i, rtn); pthread_attr_destroy(&attr); return wpool;}/* Function: workpool_stop() * Date: SRE, Thu Jul 16 11:20:16 1998 [St. Louis] * * Purpose: Waits for threads in a workpool to finish. * * Args: wpool -- ptr to the workpool structure * * Returns: (void) */static voidworkpool_stop(struct workpool_s *wpool){ int i; /* wait for threads to stop */ for (i = 0; i < wpool->num_threads; i++) if (pthread_join(wpool->thread[i],NULL) != 0) Die("pthread_join failed"); return;}/* Function: workpool_free() * Date: SRE, Thu Jul 16 11:26:27 1998 [St. Louis] * * Purpose: Free a workpool_s structure, after the threads * have finished. * * Args: wpool -- ptr to the workpool. * * Returns: (void) */static voidworkpool_free(struct workpool_s *wpool){ free(wpool->thread); free(wpool); return;}/* Function: worker_thread() * Date: SRE, Mon Sep 28 10:48:29 1998 [St. Louis] * * Purpose: The procedure executed by the worker threads. * * Args: ptr - (void *) that is recast to a pointer to * the workpool. * * Returns: (void *) */void *worker_thread(void *ptr){ struct workpool_s *wpool; /* our working threads structure */ struct plan7_s *hmm; /* an HMM to search with */ struct p7trace_s *tr; /* traceback from an alignment */ float sc; /* score of an alignment */ int rtn; /* a return code from pthreads lib */ double pvalue; /* P-value of score */ double evalue; /* E-value of a score */ struct threshold_s thresh; /* a local copy of thresholds */ wpool = (struct workpool_s *) ptr; /* Because we might dynamically change the thresholds using * Pfam GA/NC/TC cutoffs, we make a local copy of the threshold * structure in this thread. */ thresh.globT = wpool->thresh->globT; thresh.globE = wpool->thresh->globE; thresh.domT = wpool->thresh->domT; thresh.domE = wpool->thresh->domE; thresh.autocut = wpool->thresh->autocut; thresh.Z = wpool->thresh->Z; for (;;) { /* 1. acquire lock on HMM input, and get * the next HMM to work on. */ /* acquire a lock */ if ((rtn = pthread_mutex_lock(&(wpool->input_lock))) != 0) Die("pthread_mutex_lock failure: %s\n", strerror(rtn)); wpool->nhmm++; if (! HMMFileRead(wpool->hmmfp, &hmm)) { /* we're done. release lock, exit thread */ if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); pthread_exit(NULL); } SQD_DPRINTF1(("a thread is working on %s\n", hmm->name)); /* release the lock */ if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); if (hmm == NULL) Die("HMM file %s may be corrupt or in incorrect format; parse failed", wpool->hmmfile); P7Logoddsify(hmm, !(wpool->do_forward)); if (!SetAutocuts(&thresh, hmm)) Die("HMM %s did not have the right GA, NC, or TC cutoffs", hmm->name); /* 2. We have an HMM in score form. * Score the sequence. */ if (P7ViterbiSize(wpool->L, hmm->M) <= RAMLIMIT) sc = P7Viterbi(wpool->dsq, wpool->L, hmm, &tr); else sc = P7SmallViterbi(wpool->dsq, wpool->L, hmm, &tr); /* The Forward score override (see comments in serial vers) */ if (wpool->do_forward) { sc = P7Forward(wpool->dsq, wpool->L, hmm, NULL); if (wpool->do_null2) sc -= TraceScoreCorrection(hmm, tr, wpool->dsq); } /* 3. Save the output in tophits structures, after acquiring a lock */ if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0) Die("pthread_mutex_lock failure: %s\n", strerror(rtn)); SQD_DPRINTF1(("model %s scores %f\n", hmm->name, sc)); pvalue = PValue(hmm, sc); evalue = thresh.Z ? (double) thresh.Z * pvalue : (double) wpool->nhmm * pvalue; if (sc >= thresh.globT && evalue <= thresh.globE) { PostprocessSignificantHit(wpool->ghit, wpool->dhit, tr, hmm, wpool->dsq, wpool->L, wpool->seqname, NULL, NULL, /* won't need seq's acc or desc */ wpool->do_forward, sc, wpool->do_null2, &thresh, TRUE); /* TRUE -> hmmpfam mode */ } if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); P7FreeTrace(tr); FreePlan7(hmm); } /* end 'infinite' loop over HMMs in this thread */}#endif /* HMMER_THREADS */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -