hmmsearch.c

来自「这是一个基于HMM 模型的生物多序列比对算法的linux实现版本。hmmer」· C语言 代码 · 共 1,185 行 · 第 1/3 页

C
1,185
字号
      pvm_recv(-1, HMMPVM_RESULTS);      pvm_upkint(&slaveidx, 1, 1);     /* # of slave who's sending us stuff */      pvm_upkfloat(&sc, 1, 1);         /* score   */      pvm_upkdouble(&pvalue, 1, 1);    /* P-value */      pvm_upkint(&sent_trace, 1, 1);   /* TRUE if trace is coming */      tr = (sent_trace) ? PVMUnpackTrace() : NULL;      SQD_DPRINTF1(("Slave %d finished %s for me...\n", slaveidx, namelist[slaveidx]));				/* send new work */      dsq = DigitizeSequence(seq, sqinfo.len);      if (do_xnu) XNU(dsq, sqinfo.len);      pvm_initsend(PvmDataDefault);      pvm_pkint(&nseq, 1, 1);      pvm_pkint(&(sqinfo.len), 1, 1);      pvm_pkbyte((char *) dsq, sqinfo.len+2, 1); /* safety of char * cast unknown. */      pvm_send(slave_tid[slaveidx], HMMPVM_WORK);      				/* process output */      if (sent_trace)	{	  sc = PostprocessSignificantHit(ghit, dhit, 					 tr, hmm, dsqlist[slaveidx], lenlist[slaveidx],					 namelist[slaveidx], acclist[slaveidx], desclist[slaveidx],					 do_forward, sc,					 do_null2,					 thresh,					 FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */	  P7FreeTrace(tr);	}      AddToHistogram(histogram, sc);				/* record seq info for seq we just sent */      free(namelist[slaveidx]);      if (acclist[slaveidx]  != NULL) free(acclist[slaveidx]);      if (desclist[slaveidx] != NULL) free(desclist[slaveidx]);      free(dsqlist[slaveidx]);      dsqlist[slaveidx]  = dsq;      namelist[slaveidx] = Strdup(sqinfo.name);      acclist[slaveidx]  = (sqinfo.flags & SQINFO_ACC)  ? Strdup(sqinfo.acc)  : NULL;      desclist[slaveidx] = (sqinfo.flags & SQINFO_DESC) ? Strdup(sqinfo.desc) : NULL;      lenlist[slaveidx]  = sqinfo.len;      FreeSequence(seq, &sqinfo);     }  SQD_DPRINTF1(("End of receive/send loop\n"));  /* Collect the output. All n slaves are still working.   */  for (i = 0; i < nslaves && i < nseq; i++)    {				/* don't check slaves (they're exiting normally);				   window of vulnerability here to slave crashes */				/* receive output */      pvm_recv(-1, HMMPVM_RESULTS);      pvm_upkint(&slaveidx, 1, 1);     /* # of slave who's sending us stuff */      pvm_upkfloat(&sc, 1, 1);         /* score   */      pvm_upkdouble(&pvalue, 1, 1);    /* P-value */      pvm_upkint(&sent_trace, 1, 1);   /* TRUE if trace is coming */      tr = (sent_trace) ? PVMUnpackTrace() : NULL;      SQD_DPRINTF1(("Slave %d finished %s for me...\n", slaveidx, namelist[slaveidx]));      			/* process output */      if (sent_trace)	{	  sc = PostprocessSignificantHit(ghit, dhit, 					 tr, hmm, dsqlist[slaveidx], lenlist[slaveidx],					 namelist[slaveidx], acclist[slaveidx], desclist[slaveidx],					 do_forward, sc,					 do_null2,					 thresh,					 FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */	  P7FreeTrace(tr);	}      SQD_DPRINTF2(("AddToHistogram: %s\t%f\n", namelist[slaveidx], sc));      AddToHistogram(histogram, sc);				/* free seq info */      free(namelist[slaveidx]);      if (acclist[slaveidx]  != NULL) free(acclist[slaveidx]);      if (desclist[slaveidx] != NULL) free(desclist[slaveidx]);      free(dsqlist[slaveidx]);      				/* send cleanup/shutdown flag to slave */      pvm_initsend(PvmDataDefault);      code = -1;      pvm_pkint(&code, 1, 1);      pvm_send(slave_tid[slaveidx], HMMPVM_WORK);    }    /* Cleanup; quit the VM; and return   */  free(slave_tid);  free(dsqlist);  free(namelist);  free(acclist);  free(desclist);  free(lenlist);  pvm_exit();  *ret_nseq = nseq;  return;}#else /* HMMER_PVM off, no PVM support, dummy function: */static voidmain_loop_pvm(struct plan7_s *hmm, SQFILE *sqfp, struct threshold_s *thresh, int do_forward,	      int do_null2, int do_xnu, struct histogram_s *histogram, 	      struct tophit_s *ghit, struct tophit_s *dhit, int *ret_nseq){  Die("No PVM support");}#endif /*HMMER_PVM*/#ifdef HMMER_THREADS/***************************************************************** * POSIX threads implementation. *  * API: *      workpool_start()   (makes a workpool_s structure. Starts calculations.) *      workpool_stop()    (waits for threads to finish.) *      workpool_free()    (destroys the structure) *       * Threads: *      worker_thread()    (the actual parallelized worker thread). *****************************************************************//* Function: main_loop_threaded * Date:     SRE, Wed Feb 27 11:38:11 2002 [St. Louis] * * Purpose:  Search an HMM against a sequence database. *           main loop for the threaded version. *            *           In:   HMM and open sqfile, plus options *           Out:  histogram, global hits list, domain hits list, nseq. * * Args:     hmm        - the HMM to search with.  *           sqfp       - open SQFILE for sequence database *           thresh     - score/evalue threshold info *           do_forward - TRUE to score using Forward()         *           do_null2   - TRUE to use ad hoc null2 score correction *           do_xnu     - TRUE to apply XNU mask *           num_threads- number of worker threads to start, >=1 *           histogram  - RETURN: score histogram *           ghit       - RETURN: ranked global scores *           dhit       - RETURN: ranked domain scores *           ret_nseq   - RETURN: actual number of seqs searched *            * Returns:  (void) */static voidmain_loop_threaded(struct plan7_s *hmm, SQFILE *sqfp, struct threshold_s *thresh, int do_forward,		   int do_null2, int do_xnu, int num_threads,		   struct histogram_s *histogram, 		   struct tophit_s *ghit, struct tophit_s *dhit, int *ret_nseq){  struct workpool_s *wpool;	/* pool of worker threads                  */  int    nseq;			/* number of sequences searched            */   wpool = workpool_start(hmm, sqfp, do_xnu, do_forward, do_null2, thresh,			 ghit, dhit, histogram, num_threads);  workpool_stop(wpool);  nseq = wpool->nseq;  workpool_free(wpool);  *ret_nseq = nseq;  return;}/* Function: workpool_start() * Date:     SRE, Mon Oct  5 16:44:53 1998 * * Purpose:  Initialize a workpool_s structure, and return it. * * Args:     sqfp       - open sequence file, at start *           do_xnu     - TRUE to apply XNU filter *           do_forward - TRUE to score using Forward *           do_null2   - TRUE to apply null2 ad hoc correction *           thresh     - score/evalue threshold info *           ghit       - per-seq hit list *           dhit       - per-domain hit list              *           hist       - histogram (alloced but empty) *           num_threads- number of worker threads to run. * * Returns:  ptr to struct workpool_s. *           Caller must wait for threads to finish with workpool_stop(), *           then free the structure with workpool_free(). */static struct workpool_s *workpool_start(struct plan7_s *hmm, SQFILE *sqfp, int do_xnu,	       int do_forward, int do_null2, struct threshold_s *thresh,	       struct tophit_s *ghit, struct tophit_s *dhit, 	       struct histogram_s *hist, int num_threads){  struct workpool_s *wpool;  pthread_attr_t    attr;  int i;  int rtn;  wpool             = MallocOrDie(sizeof(struct workpool_s));  wpool->thread     = MallocOrDie(num_threads * sizeof(pthread_t));  wpool->hmm        = hmm;  wpool->do_xnu     = do_xnu;  wpool->do_forward = do_forward;  wpool->do_null2   = do_null2;  wpool->thresh     = thresh;  wpool->sqfp       = sqfp;  wpool->nseq       = 0;  if ((rtn = pthread_mutex_init(&(wpool->input_lock), NULL)) != 0)    Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));  wpool->ghit       = ghit;  wpool->dhit       = dhit;  wpool->hist       = hist;  if ((rtn = pthread_mutex_init(&(wpool->output_lock), NULL)) != 0)    Die("pthread_mutex_init FAILED; %s\n", strerror(rtn));  wpool->num_threads= num_threads;  /* Create slave threads. See comments in hmmcalibrate.c at this   * step, regarding concurrency, system scope, and portability   * amongst various UNIX implementations of pthreads.   */  pthread_attr_init(&attr);#ifndef __sgi#ifdef HAVE_PTHREAD_ATTR_SETSCOPE  pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);#endif#endif#ifdef HAVE_PTHREAD_SETCONCURRENCY  pthread_setconcurrency(num_threads+1);#endif  /* pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); */  for (i = 0; i < num_threads; i++)    if ((rtn = pthread_create(&(wpool->thread[i]), &attr,			      worker_thread , (void *) wpool)) != 0)      Die("Failed to create thread %d; return code %d\n", i, rtn);  pthread_attr_destroy(&attr);  return wpool;}/* Function: workpool_stop() * Date:     SRE, Thu Jul 16 11:20:16 1998 [St. Louis] * * Purpose:  Waits for threads in a workpool to finish. * * Args:     wpool -- ptr to the workpool structure * * Returns:  (void) */static voidworkpool_stop(struct workpool_s *wpool){  int i;				/* wait for threads to stop */  for (i = 0; i < wpool->num_threads; i++)    if (pthread_join(wpool->thread[i],NULL) != 0)      Die("pthread_join failed");  return;}/* Function: workpool_free() * Date:     SRE, Thu Jul 16 11:26:27 1998 [St. Louis] * * Purpose:  Free a workpool_s structure, after the threads *           have finished. * * Args:     wpool -- ptr to the workpool. * * Returns:  (void) */static voidworkpool_free(struct workpool_s *wpool){  free(wpool->thread);  free(wpool);  return;}/* Function: worker_thread() * Date:     SRE, Mon Sep 28 10:48:29 1998 [St. Louis] * * Purpose:  The procedure executed by the worker threads. * * Args:     ptr  - (void *) that is recast to a pointer to *                  the workpool. * * Returns:  (void *) */void *worker_thread(void *ptr){  struct workpool_s *wpool;     /* our working threads structure   */  char  *seq;                   /* target sequence                 */  SQINFO sqinfo;		/* information assoc w/ seq        */  unsigned char     *dsq;       /* digitized sequence              */  struct dpmatrix_s *mx;        /* growable DP matrix              */  struct p7trace_s  *tr;        /* traceback from an alignment     */  float  sc;			/* score of an alignment           */  int    rtn;			/* a return code from pthreads lib */  double pvalue;		/* P-value of score                */  double evalue;		/* E-value of score                */  wpool = (struct workpool_s *) ptr;  /* Init with a small DP matrix; we'll grow in the sequence dimension   * overalloc'ing by 25 rows (residues).   */  mx = CreatePlan7Matrix(1, wpool->hmm->M, 25, 0);  for (;;) {    /* 1. acquire lock on sequence input, and get     *    the next seq to work on.     */				/* acquire a lock */    if ((rtn = pthread_mutex_lock(&(wpool->input_lock))) != 0)      Die("pthread_mutex_lock failure: %s\n", strerror(rtn));    if (! ReadSeq(wpool->sqfp, wpool->sqfp->format, &seq, &sqinfo))      {	/* we're done. release lock, exit thread */	if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)	  Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));	FreePlan7Matrix(mx);	pthread_exit(NULL);      }    SQD_DPRINTF1(("a thread is working on %s\n", sqinfo.name));    wpool->nseq++;				/* release the lock */    if ((rtn = pthread_mutex_unlock(&(wpool->input_lock))) != 0)      Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));    if (sqinfo.len == 0) continue; /* silent skip of len=0 seqs (wormpep!?!) */    dsq = DigitizeSequence(seq, sqinfo.len);    if (wpool->do_xnu) XNU(dsq, sqinfo.len);          /* 1. Recover a trace by Viterbi.     */    if (P7ViterbiSpaceOK(sqinfo.len, wpool->hmm->M, mx))      sc = P7Viterbi(dsq, sqinfo.len, wpool->hmm, mx, &tr);    else      sc = P7SmallViterbi(dsq, sqinfo.len, wpool->hmm, mx, &tr);    /* 2. If we're using Forward scores, do another DP     *    to get it; else, we already have a Viterbi score     *    in sc.     */    if (wpool->do_forward) {      sc  = P7Forward(dsq, sqinfo.len, wpool->hmm, NULL);      if (wpool->do_null2) sc -= TraceScoreCorrection(wpool->hmm, tr, dsq);    }    /* 3. Save the output in tophits and histogram structures, after acquiring a lock     */    if ((rtn = pthread_mutex_lock(&(wpool->output_lock))) != 0)      Die("pthread_mutex_lock failure: %s\n", strerror(rtn));    SQD_DPRINTF1(("seq %s scores %f\n", sqinfo.name, sc));    pvalue = PValue(wpool->hmm, sc);    evalue = wpool->thresh->Z ? (double) wpool->thresh->Z * pvalue : (double) wpool->nseq * pvalue;     if (sc >= wpool->thresh->globT && evalue <= wpool->thresh->globE)       { 	sc = PostprocessSignificantHit(wpool->ghit, wpool->dhit, 				       tr, wpool->hmm, dsq, sqinfo.len,				       sqinfo.name, 				       sqinfo.flags & SQINFO_ACC  ? sqinfo.acc  : NULL, 				       sqinfo.flags & SQINFO_DESC ? sqinfo.desc : NULL, 				       wpool->do_forward, sc,				       wpool->do_null2,				       wpool->thresh,				       FALSE); /* FALSE-> not hmmpfam mode, hmmsearch mode */      }    SQD_DPRINTF2(("AddToHistogram: %s\t%f\n", sqinfo.name, sc));    AddToHistogram(wpool->hist, sc);    if ((rtn = pthread_mutex_unlock(&(wpool->output_lock))) != 0)      Die("pthread_mutex_unlock failure: %s\n", strerror(rtn));    P7FreeTrace(tr);    FreeSequence(seq, &sqinfo);    free(dsq);  } /* end 'infinite' loop over seqs in this thread */}#else /*HMMER_THREADS off; no threads support; dummy stub: */static voidmain_loop_threaded(struct plan7_s *hmm, SQFILE *sqfp, struct threshold_s *thresh, int do_forward,		   int do_null2, int do_xnu, int num_threads,		   struct histogram_s *histogram, 		   struct tophit_s *ghit, struct tophit_s *dhit, int *ret_nseq){  Die("No threads support");}#endif /* HMMER_THREADS */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?