📄 hmmsearch.c
字号:
* a number from 0..nslaves-1. */ for (nseq = 0; nseq < nslaves; nseq++) { if (! ReadSeq(sqfp, sqfp->format, &seq, &sqinfo)) break; if (sqinfo.len == 0) { nseq--; continue; } dsq = DigitizeSequence(seq, sqinfo.len); if (do_xnu && Alphabet_type == hmmAMINO) XNU(dsq, sqinfo.len); pvm_initsend(PvmDataDefault); pvm_pkint(&nseq, 1, 1); pvm_pkint(&(sqinfo.len), 1, 1); pvm_pkbyte(dsq, sqinfo.len+2, 1); pvm_send(slave_tid[nseq], HMMPVM_WORK); SQD_DPRINTF1(("sent a dsq : %d bytes\n", sqinfo.len+2)); namelist[nseq] = Strdup(sqinfo.name); acclist[nseq] = (sqinfo.flags & SQINFO_ACC) ? Strdup(sqinfo.acc) : NULL; desclist[nseq] = (sqinfo.flags & SQINFO_DESC) ? Strdup(sqinfo.desc) : NULL; lenlist[nseq] = sqinfo.len; dsqlist[nseq] = dsq; FreeSequence(seq, &sqinfo); } SQD_DPRINTF1(("%d slaves are loaded\n", nseq)); /* main receive/send loop */ while (ReadSeq(sqfp, sqfp->format, &seq, &sqinfo)) { if (sqinfo.len == 0) { continue; } nseq++; /* check slaves before blocking */ PVMCheckSlaves(slave_tid, nslaves); /* receive output */ SQD_DPRINTF1(("Waiting for a slave to give me output...\n")); pvm_recv(-1, HMMPVM_RESULTS); pvm_upkint(&slaveidx, 1, 1); /* # of slave who's sending us stuff */ pvm_upkfloat(&sc, 1, 1); /* score */ pvm_upkdouble(&pvalue, 1, 1); /* P-value */ pvm_upkint(&sent_trace, 1, 1); /* TRUE if trace is coming */ tr = (sent_trace) ? PVMUnpackTrace() : NULL; SQD_DPRINTF1(("Slave %d finished %s for me...\n", slaveidx, namelist[slaveidx])); /* send new work */ dsq = DigitizeSequence(seq, sqinfo.len); if (do_xnu) XNU(dsq, sqinfo.len); pvm_initsend(PvmDataDefault); pvm_pkint(&nseq, 1, 1); pvm_pkint(&(sqinfo.len), 1, 1); pvm_pkbyte(dsq, sqinfo.len+2, 1); pvm_send(slave_tid[slaveidx], HMMPVM_WORK); /* process output */ if (sent_trace) { PostprocessSignificantHit(ghit, dhit, tr, hmm, dsqlist[slaveidx], lenlist[slaveidx], namelist[slaveidx], acclist[slaveidx], desclist[slaveidx], do_forward, sc, do_null2, thresh, FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */ P7FreeTrace(tr); } AddToHistogram(histogram, sc); /* record seq info for seq we just sent */ free(namelist[slaveidx]); if (acclist[slaveidx] != NULL) free(acclist[slaveidx]); if (desclist[slaveidx] != NULL) free(desclist[slaveidx]); free(dsqlist[slaveidx]); dsqlist[slaveidx] = dsq; namelist[slaveidx] = Strdup(sqinfo.name); acclist[slaveidx] = (sqinfo.flags & SQINFO_ACC) ? Strdup(sqinfo.acc) : NULL; desclist[slaveidx] = (sqinfo.flags & SQINFO_DESC) ? Strdup(sqinfo.desc) : NULL; lenlist[slaveidx] = sqinfo.len; FreeSequence(seq, &sqinfo); } SQD_DPRINTF1(("End of receive/send loop\n")); /* Collect the output. All n slaves are still working. */ for (i = 0; i < nslaves && i < nseq; i++) { /* don't check slaves (they're exiting normally); window of vulnerability here to slave crashes */ /* receive output */ pvm_recv(-1, HMMPVM_RESULTS); pvm_upkint(&slaveidx, 1, 1); /* # of slave who's sending us stuff */ pvm_upkfloat(&sc, 1, 1); /* score */ pvm_upkdouble(&pvalue, 1, 1); /* P-value */ pvm_upkint(&sent_trace, 1, 1); /* TRUE if trace is coming */ tr = (sent_trace) ? PVMUnpackTrace() : NULL; SQD_DPRINTF1(("Slave %d finished %s for me...\n", slaveidx, namelist[slaveidx])); /* process output */ if (sent_trace) { PostprocessSignificantHit(ghit, dhit, tr, hmm, dsqlist[slaveidx], lenlist[slaveidx], namelist[slaveidx], acclist[slaveidx], desclist[slaveidx], do_forward, sc, do_null2, thresh, FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */ P7FreeTrace(tr); } AddToHistogram(histogram, sc); /* free seq info */ free(namelist[slaveidx]); if (acclist[slaveidx] != NULL) free(acclist[slaveidx]); if (desclist[slaveidx] != NULL) free(desclist[slaveidx]); free(dsqlist[slaveidx]); /* send cleanup/shutdown flag to slave */ pvm_initsend(PvmDataDefault); code = -1; pvm_pkint(&code, 1, 1); pvm_send(slave_tid[slaveidx], HMMPVM_WORK); } /* Cleanup; quit the VM; and return */ free(slave_tid); free(dsqlist); free(namelist); free(acclist); free(desclist); free(lenlist); pvm_exit(); *ret_nseq = nseq; return;}#endif /* HMMER_PVM */#ifdef HMMER_THREADS/***************************************************************** * POSIX threads implementation. * * API: * workpool_start() (makes a workpool_s structure. Starts calculations.) * workpool_stop() (waits for threads to finish.) * workpool_free() (destroys the structure) * * Threads: * worker_thread() (the actual parallelized worker thread). *****************************************************************//* Function: workpool_start() * Date: SRE, Mon Oct 5 16:44:53 1998 * * Purpose: Initialize a workpool_s structure, and return it. * * Args: sqfp - open sequence file, at start * do_xnu - TRUE to apply XNU filter * do_forward - TRUE to score using Forward * do_null2 - TRUE to apply null2 ad hoc correction * thresh - score/evalue threshold info * ghit - per-seq hit list * dhit - per-domain hit list * hist - histogram (alloced but empty) * num_threads- number of worker threads to run. * * Returns: ptr to struct workpool_s. * Caller must wait for threads to finish with workpool_stop(), * then free the structure with workpool_free(). */static struct workpool_s *workpool_start(struct plan7_s *hmm, SQFILE *sqfp, int do_xnu, int do_forward, int do_null2, struct threshold_s *thresh, struct tophit_s *ghit, struct tophit_s *dhit, struct histogram_s *hist, int num_threads){ struct workpool_s *wpool; pthread_attr_t attr; int i; int rtn; wpool = MallocOrDie(sizeof(struct workpool_s)); wpool->thread = MallocOrDie(num_threads * sizeof(pthread_t)); wpool->hmm = hmm; wpool->do_xnu = do_xnu; wpool->do_forward = do_forward; wpool->do_null2 = do_null2; wpool->thresh = thresh; wpool->sqfp = sqfp; wpool->nseq = 0; if ((rtn = pthread_mutex_init(&(wpool->input_lock), NULL)) != 0) Die("pthread_mutex_init FAILED; %s\n", strerror(rtn)); wpool->ghit = ghit; wpool->dhit = dhit; wpool->hist = hist; if ((rtn = pthread_mutex_init(&(wpool->output_lock), NULL)) != 0) Die("pthread_mutex_init FAILED; %s\n", strerror(rtn)); wpool->num_threads= num_threads; /* Create slave threads. See comments in hmmcalibrate.c at this * step, regarding concurrency, system scope, and portability * amongst various UNIX implementations of pthreads. */ pthread_attr_init(&attr);#ifndef __sgi#ifdef HAVE_PTHREAD_ATTR_SETSCOPE pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);#endif#endif#ifdef HAVE_PTHREAD_SETCONCURRENCY pthread_setconcurrency(num_threads+1);#endif /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */ for (i = 0; i < num_threads; i++) if ((rtn = pthread_create(&(wpool->thread[i]), &attr, worker_thread , (void *) wpool)) != 0) Die("Failed to create thread %d; return code %d\n", i, rtn); pthread_attr_destroy(&attr); return wpool;}/* Function: workpool_stop() * Date: SRE, Thu Jul 16 11:20:16 1998 [St. Louis] * * Purpose: Waits for threads in a workpool to finish. * * Args: wpool -- ptr to the workpool structure * * Returns: (void) */static voidworkpool_stop(struct workpool_s *wpool){ int i; /* wait for threads to stop */ for (i = 0; i < wpool->num_threads; i++) if (pthread_join(wpool->thread[i],NULL) != 0) Die("pthread_join failed"); return;}/* Function: workpool_free() * Date: SRE, Thu Jul 16 11:26:27 1998 [St. Louis] * * Purpose: Free a workpool_s structure, after the threads * have finished. * * Args: wpool -- ptr to the workpool. * * Returns: (void) */static voidworkpool_free(struct workpool_s *wpool){ free(wpool->thread); free(wpool); return;}/* Function: worker_thread() * Date: SRE, Mon Sep 28 10:48:29 1998 [St. Louis] * * Purpose: The procedure executed by the worker threads. * * Args: ptr - (void *) that is recast to a pointer to * the workpool. * * Returns: (void *) */void *worker_thread(void *ptr){ struct workpool_s *wpool; /* our working threads structure */ char *seq; /* target sequence */ SQINFO sqinfo; /* information assoc w/ seq */ char *dsq; /* digitized sequence */ struct p7trace_s *tr; /* traceback from an alignment */ float sc; /* score of an alignment */ int rtn; /* a return code from pthreads lib */ double pvalue; /* P-value of score */ double evalue; /* E-value of score */ wpool = (struct workpool_s *) ptr; for (;;) { /* 1. acquire lock on sequence input, and get * the next seq to work on. */ /* acquire a lock */ if ((rtn = pthread_mutex_lock(&(wpool->input_lock))) != 0) Die("pthread_mutex_lock failure: %s\n", strerror(rtn)); if (! ReadSeq(wpool->sqfp, wpool->sqfp->format, &seq, &sqinfo)) { /* we're done. release lock, exit thread */ if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); pthread_exit(NULL); } SQD_DPRINTF1(("a thread is working on %s\n", sqinfo.name)); wpool->nseq++; /* release the lock */ if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); if (sqinfo.len == 0) continue; /* silent skip of len=0 seqs (wormpep!?!) */ dsq = DigitizeSequence(seq, sqinfo.len); if (wpool->do_xnu) XNU(dsq, sqinfo.len); /* 1. Recover a trace by Viterbi. */ if (P7ViterbiSize(sqinfo.len, wpool->hmm->M) <= RAMLIMIT) sc = P7Viterbi(dsq, sqinfo.len, wpool->hmm, &tr); else sc = P7SmallViterbi(dsq, sqinfo.len, wpool->hmm, &tr); /* 2. If we're using Forward scores, do another DP * to get it; else, we already have a Viterbi score * in sc. */ if (wpool->do_forward) sc = P7Forward(dsq, sqinfo.len, wpool->hmm, NULL); if (wpool->do_null2) sc -= TraceScoreCorrection(wpool->hmm, tr, dsq); /* 3. Save the output in tophits and histogram structures, after acquiring a lock */ if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0) Die("pthread_mutex_lock failure: %s\n", strerror(rtn)); SQD_DPRINTF1(("seq %s scores %f\n", sqinfo.name, sc)); pvalue = PValue(wpool->hmm, sc); evalue = wpool->thresh->Z ? (double) wpool->thresh->Z * pvalue : (double) wpool->nseq * pvalue; if (sc >= wpool->thresh->globT && evalue <= wpool->thresh->globE) { PostprocessSignificantHit(wpool->ghit, wpool->dhit, tr, wpool->hmm, dsq, sqinfo.len, sqinfo.name, sqinfo.flags & SQINFO_ACC ? sqinfo.acc : NULL, sqinfo.flags & SQINFO_DESC ? sqinfo.desc : NULL, wpool->do_forward, sc, wpool->do_null2, wpool->thresh, FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */ } AddToHistogram(wpool->hist, sc); if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0) Die("pthread_mutex_unlock failure: %s\n", strerror(rtn)); P7FreeTrace(tr); FreeSequence(seq, &sqinfo); free(dsq); } /* end 'infinite' loop over seqs in this thread */}#endif /* HMMER_THREADS */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -