parallel_example.cpp

来自「Boost provides free peer-reviewed portab」· C++ 代码 · 共 197 行

CPP
197
字号
// Copyright (C) 2005-2006 Matthias Troyer// Use, modification and distribution is subject to the Boost Software// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)// An example of a parallel Monte Carlo simulation using some nodes to produce// data and others to aggregate the data#include <iostream>#include <boost/mpi.hpp>#include <boost/random/parallel.hpp>#include <boost/random.hpp>#include <boost/foreach.hpp>#include <iostream>#include <cstdlib>namespace mpi = boost::mpi;enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};void calculate_samples(int sample_length){  int num_samples = 100;  std::vector<double> sample(sample_length);  // setup communicator by splitting  mpi::communicator world;  mpi::communicator calculate_communicator = world.split(0);  unsigned int num_calculate_ranks = calculate_communicator.size();    // the master of the accumulaion ranks is the first of them, hence  // with a rank just one after the last calculation rank  int master_accumulate_rank = num_calculate_ranks;    // the master of the calculation ranks sends the skeleton of the sample   // to the master of the accumulation ranks  if (world.rank()==0)    world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));    // next we extract the content of the sample vector, to be used in sending  // the content later on    mpi::content sample_content = mpi::get_content(sample);    // now intialize the parallel random number generator    boost::lcg64 engine(        boost::random::stream_number = calculate_communicator.rank(),        boost::random::total_streams = calculate_communicator.size()      );        boost::variate_generator<boost::lcg64&,boost::uniform_real<> >     rng(engine,boost::uniform_real<>());      for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {        // calculate sample by filling the vector with random numbers    // note that std::generate will not work since it takes the generator    // by value, and boost::ref cannot be used as a generator.    // boost::ref should be fixed so that it can be used as generator        BOOST_FOREACH(double& x, sample)      x = rng();        // send sample to accumulation ranks    // Ideally we want to do this as a broadcast with an inter-communicator     // between the calculation and accumulation ranks. MPI2 should support     // this, but here we present an MPI1 compatible solution.        // send content of sample to first (master) accumulation process        world.send(master_accumulate_rank,sample_tag,sample_content);        // gather some results from all calculation ranks        double local_result = sample[0];    std::vector<double> gathered_results(calculate_communicator.size());    mpi::all_gather(calculate_communicator,local_result,gathered_results);  }    // we are done: the master tells the accumulation ranks to quit  if (world.rank()==0)    world.send(master_accumulate_rank,quit_tag);}void accumulate_samples(){  std::vector<double> sample;  // setup the communicator for all accumulation ranks by splitting  mpi::communicator world;  mpi::communicator accumulate_communicator = world.split(1);  bool is_master_accumulate_rank = accumulate_communicator.rank()==0;  // the master receives the sample skeleton    if (is_master_accumulate_rank)     world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));      // and broadcasts it to all accumulation ranks  mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);    // next we extract the content of the sample vector, to be used in receiving  // the content later on    mpi::content sample_content = mpi::get_content(sample);    // accumulate until quit is called  double sum=0.;  while (true) {          // the accumulation master checks whether we should quit    if (world.iprobe(0,quit_tag)) {      world.recv(0,quit_tag);      for (int i=1; i<accumulate_communicator.size();++i)        accumulate_communicator.send(i,quit_tag);      std::cout << sum << "\n";      break; // We're done    }    // the otehr accumulation ranks check whether we should quit    if (accumulate_communicator.iprobe(0,quit_tag)) {      accumulate_communicator.recv(0,quit_tag);      std::cout << sum << "\n";      break; // We're done    }        // check whether the master accumulation rank has received a sample    if (world.iprobe(mpi::any_source,sample_tag)) {      BOOST_ASSERT(is_master_accumulate_rank);            // receive the content      world.recv(mpi::any_source,sample_tag,sample_content);            // now we need to braodcast      // the problam is we do not have a non-blocking broadcast that we could       // abort if we receive a quit message from the master. We thus need to      // first tell all accumulation ranks to start a broadcast. If the sample      // is small, we could just send the sample in this message, but here we      // optimize the code for large samples, so that the overhead of these      // sends can be ignored, and we count on an optimized broadcast      // implementation with O(log N) complexity      for (int i=1; i<accumulate_communicator.size();++i)        accumulate_communicator.send(i,sample_broadcast_tag);            // now broadcast the contents of the sample to all accumulate ranks      mpi::broadcast(accumulate_communicator,sample_content,0);            // and handle the sample by summing the appropriate value      sum += sample[0];    }      // the other accumulation ranks wait for a mesage to start the broadcast    if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {      BOOST_ASSERT(!is_master_accumulate_rank);            accumulate_communicator.recv(0,sample_broadcast_tag);            // receive broadcast of the sample contents      mpi::broadcast(accumulate_communicator,sample_content,0);            // and handle the sample            // and handle the sample by summing the appropriate value      sum += sample[accumulate_communicator.rank()];    }  }}int main(int argc, char** argv){  mpi::environment env(argc, argv);  mpi::communicator world;  // half of the processes generate, the others accumulate  // the sample size is just the number of accumulation ranks  if (world.rank() < world.size()/2)    calculate_samples(world.size()-world.size()/2);  else    accumulate_samples();  return 0;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?