message.c
来自「EM算法的改进」· C语言 代码 · 共 531 行 · 第 1/2 页
C
531 行
* **********************************************************************/void max_packets (void *f_data, void *f_result, int *f_length, MPI_Datatype *datatype){ double rd = ((REDUCE_PACKET *)f_result)->data; double dd = ((REDUCE_PACKET *)f_data)->data; double rw = ((REDUCE_PACKET *)f_result)->s_width; double dw = ((REDUCE_PACKET *)f_data)->s_width; double rn = ((REDUCE_PACKET *)f_result)->s_nsites; double dn = ((REDUCE_PACKET *)f_data)->s_nsites; /* Compare the data in the two packets. */ if ( dd < rd || (dd == rd && dw < rw) || (dd == rd && dw == rw && dn < rn) ) { /* Put the data into the result position. */ ((REDUCE_PACKET *)f_result)->data = dd; ((REDUCE_PACKET *)f_result)->s_width = dw; ((REDUCE_PACKET *)f_result)->s_nsites = dn; ((REDUCE_PACKET *)f_result)->ID = ((REDUCE_PACKET *)f_data)->ID; }}/********************************************************************** * * void reduce_across_models * * Do a reduction across an entire model. The model with the lowest * score in the 'logev' field gets propagated to every node. * Ties are resolved in favor of: * 1) lowest start width * 2) lowest start nsites * * This function is called by 'meme.c'. **********************************************************************/void reduce_across_models( MODEL *model, int alength){ static int init; static MPI_Datatype reduction_packet_type; static MPI_Op max_packets_op; REDUCE_PACKET a_packet, best_packet; double theta_matrix[MAXALPH * (MAXSITE + 1)]; double obs_matrix[MAXALPH * (MAXSITE + 1)]; /* Add up the number of EM iterations from each node. */ mpReduceAdd(&(model->iter)); /* Package the best model's sig together with the processor ID. */ a_packet.data = model->logev; a_packet.s_width = model->pw; a_packet.s_nsites = model->psites; a_packet.ID = (double)mpMyID(); /*fprintf(stdout, "%d: Prior score=%g\n", mpMyID(), a_packet.data);*/ /* Reduce across all the sig-ID pairs. */ /* Make sure we have a handle for the reduction function. */ if (init == 0) { init = 1; MPI_Type_contiguous(4, MPI_DOUBLE, &reduction_packet_type); MPI_Type_commit(&reduction_packet_type); MPI_Op_create(max_packets, TRUE, &max_packets_op); } /* Do the reduction. */ MPI_Allreduce((void *)&a_packet, (void *)&best_packet, 1, reduction_packet_type, max_packets_op, MPI_COMM_WORLD); /*fprintf(stdout, "%d: Winner=%d score=%g\n", mpMyID(), (int)best_packet.ID, best_packet.data);*/ /* The losers store their theta matrix and maxima pointers so they don't get overwritten in the upcoming broadcast. */ if (mpMyID() != (int)best_packet.ID) save_theta_ptrs(model, 1); /* Broadcast the best model from the winning ID. */ mpBroadcast((void *)model, sizeof(MODEL), (int)best_packet.ID); /*fprintf(stderr, "%d: Past model broadcast.\n", mpMyID()); fflush(stderr);*/ /* After the broadcast, the losers restore their original pointers. */ if (mpMyID() != (int)best_packet.ID) save_theta_ptrs(model, 0); /* The winner packages the theta matrix up into a linear array. */ if (mpMyID() == (int)best_packet.ID) theta_packer(model, theta_matrix, obs_matrix, 1, alength); /* Broadcast the theta and obs matrix arrays and discrete maxima. */ mpBroadcast((void *)theta_matrix, sizeof(double) * alength * model->w, (int)best_packet.ID); mpBroadcast((void *)obs_matrix, sizeof(double) * alength * model->w, (int)best_packet.ID); mpBroadcast((void *)model->maxima, sizeof(p_prob) * model->nsites_dis, (int)best_packet.ID); /*fprintf(stderr, "%d: Past theta broadcast.\n", mpMyID()); fflush(stderr);*/ /* Everyone else unpacks the theta matrix. */ if (mpMyID() != (int)best_packet.ID) theta_packer(model, theta_matrix, obs_matrix, 0, alength);}/********************************************************************** * * void max_s_packets * * Find the maximum from two s_point packets. * This function is used in the reduction across starting points. * * Ties are broken by taking first starting point in dataset. * **********************************************************************/void max_s_packets(void *f_data, void *f_result, int *f_length, MPI_Datatype *datatype){ int i_nsites0; double rs, ds, ri, di, rj, dj; /* Compare each s_point in the array. */ for (i_nsites0 = 0; i_nsites0 < *f_length; i_nsites0++) { /* Get the two scores. */ rs = ((S_POINT_PACKET *)f_result + i_nsites0)->score; ri = ((S_POINT_PACKET *)f_result + i_nsites0)->iseq; rj = ((S_POINT_PACKET *)f_result + i_nsites0)->ioff; ds = ((S_POINT_PACKET *)f_data + i_nsites0)->score; di = ((S_POINT_PACKET *)f_data + i_nsites0)->iseq; dj = ((S_POINT_PACKET *)f_data + i_nsites0)->ioff; /* Compare the two scores. */ if (ds > rs || (ds == rs && di < ri) || (ds == rs && di == ri && dj < rj)) { /* Copy the new data into the result location. */ ((S_POINT_PACKET *)f_result + i_nsites0)->score = ds; ((S_POINT_PACKET *)f_result + i_nsites0)->iseq = di; ((S_POINT_PACKET *)f_result + i_nsites0)->ioff = dj; } }}/********************************************************************** * * void reduce_across_s_points * * Do a reduction across an array of starting points. For each * position in the array, the starting point with the lowest value in * the 'score' field gets propagated to every node. * * This function is called by subseq7.c. **********************************************************************/void reduce_across_s_points( S_POINT *s_points, SAMPLE **samples, int n_nsites0, int n_starts){ static int init; static MPI_Datatype s_point_packet_type; static MPI_Op max_s_packets_op; int i_packet; S_POINT_PACKET packets[1000], best_packets[1000]; /* NOTE: sizeof(S_POINT) = 344 bytes sizeof(S_POINT_PACKET) = 16 bytes */ /* Package the scores into an array of packets. */ /* Don't need w0 & nsites0 because they're invariant across processors.*/ /* Don't want e_cons0 & cons0 because they are pointers (We recalculate e_cons0 and contents of cons0 later). */ for (i_packet = 0; i_packet < n_nsites0; i_packet++) { packets[i_packet].score = s_points[i_packet].score; packets[i_packet].iseq = (double)s_points[i_packet].iseq; packets[i_packet].ioff = (double)s_points[i_packet].ioff; } /* printf("BEFORE\n"); for (i_packet = 0; i_packet < n_nsites0; i_packet++) fprintf(stdout, "node %d, packet %d: score=%g iseq=%d ioff=%d\n", mpMyID(), i_packet, packets[i_packet].score, (int)packets[i_packet].iseq, (int)packets[i_packet].ioff); fflush(stdout); */ /* MPI */ /* Reduce across the list of packets. */ if (init == 0) { init = 1; MPI_Type_contiguous(3, MPI_DOUBLE, &s_point_packet_type); MPI_Type_commit(&s_point_packet_type); MPI_Op_create(max_s_packets, TRUE, &max_s_packets_op); } /* Do the reduction. */ MPI_Allreduce((void *)&packets, (void *)&best_packets, n_nsites0, s_point_packet_type, max_s_packets_op, MPI_COMM_WORLD); /* printf("AFTER\n"); for (i_packet = 0; i_packet < n_nsites0; i_packet++) fprintf(stdout, "node %d, packet %d: score=%g iseq=%d ioff=%d\n", mpMyID(), i_packet, best_packets[i_packet].score, (int)best_packets[i_packet].iseq, (int)best_packets[i_packet].ioff); fflush(stdout); */ /* Unpack the data. */ for (i_packet = 0; i_packet < n_nsites0; i_packet++) { s_points[i_packet].score = best_packets[i_packet].score; s_points[i_packet].iseq = (int)best_packets[i_packet].iseq; s_points[i_packet].ioff = (int)best_packets[i_packet].ioff; /* Set the e_cons0 fields to the proper memory locations. */ s_points[i_packet].e_cons0 = samples[s_points[i_packet].iseq]->res + s_points[i_packet].ioff; } /* Add up all of the values of n_starts. */ mpReduceAdd(&n_starts);}/**********************************************************************//* get_start_n_end Get the precalculated astarting and ending sequences and offsets.*//**********************************************************************/extern void get_start_n_end ( int *start_seq, /* starting sequence number */ int *start_off, /* offset in starting sequence */ int *end_seq, /* ending sequence number */ int *end_off /* offset in ending sequence */){ *start_seq = start_n_end->start_seq; *start_off = start_n_end->start_off; *end_seq = start_n_end->end_seq; *end_off = start_n_end->end_off;} /* get_start_n_end *//********************************************************************** * void print_model * * Print some of the contents of a model. * (Debugging only.) **********************************************************************/extern void print_model( char *label, MODEL *model){ fprintf(stderr, "%d: %s -- sig=%1.2e", mpMyID(), label, model->logev); fflush(stdout);}#endif /* PARALLEL */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?