Overview
Cluster-Delegator is a developer package to assist in writing C++ applications utilizing MPI on a cluster in a single-coordinator/many-worker (star topology) arrangement. The MPI interface is abstracted away. Instead, one constructs a program by writing methods for a class Process
which define how to create jobs, how to work jobs to produce results, and how to process the results. The cluster-delegator machinery then takes care of all of the interprocess communication. Jobs and results are handled using the <<
and >>
streaming operators which will package data structures (any data structure with a serialize
method as described in the boost serialization manual) into a message class.
Here is the "Hello World!" program:
#include "cluster-delegator.hpp"
#include <string>
#include <vector>
class Process : public Coordinator_Worker_Process {
public:
void command_line ( int argcin, char * argvin [] ) {
std::cout << "This is called by every process at start!\n";
}
void initialize ( void ) {
std::cout << "This is called by the coordinator before jobs start!\n";
current_job_number = 0;
names = { "Shaun", "Pawel", "Ippei", "World" };
total_number_of_jobs = names . size ();
}
int prepare ( Message & job_message ) {
std::cout << "This is called by the coordinator to produce jobs!\n";
if ( current_job_number == total_number_of_jobs ) return 1; // Code 1: No more jobs!
job_message << names [ current_job_number ++ ]; return 0; // Code 0: A job was sent!
}
void work ( Message & result_message, Message const& job_message ) const {
std::cout << "This method is called by a worker receiving a job!\n";
std::string name; job_message >> name;
std::string greeting = "Hello " + name + "!";
result_message << greeting;
}
void accept ( Message const& result_message ) {
std::cout << "This is called by the coordinator to process a result!\n";
std::string data; result_message >> data;
std::cout << data << "\n";
}
void finalize ( void ) {
std::cout << "This is called by the coordinator when all jobs are done!\n";
}
private:
// Each process has its own copy of the member variables below:
int current_job_number;
int total_number_of_jobs;
std::vector<std::string> names;
};
int main ( int argc, char * argv [] ) {
delegator::Start ();
delegator::Run < Process > (argc, argv);
delegator::Stop ();
return 0;
}
Here is (one possible) output of this "Hello World!" program with a 3 process MPI job:
> mpiexec -np 3 ./bin/helloworld
This is called by every process at start! This is called by every process at start! This is called by the coordinator before jobs start! This is called by every process at start! This is called by the coordinator to produce jobs! This is called by the coordinator to produce jobs! This is called by the coordinator to produce jobs! This is called by the coordinator to produce jobs! This method is called by a worker receiving a job! This method is called by a worker receiving a job! This is called by the coordinator to process a result! Hello Pawel! This is called by the coordinator to process a result! Hello Ippei! This is called by the coordinator to produce jobs! This method is called by a worker receiving a job! This method is called by a worker receiving a job! This is called by the coordinator to process a result! Hello Shaun! This is called by the coordinator to process a result! Hello World! This is called by the coordinator when all jobs are done!
Acknowledgements
This software was developed in group discussions between Shaun Harker, Pawel Pilarczyk, and Ippei Obayashi in 2010. The current thread-based implementation is due to Shaun Harker.
The Coordinator-Worker scheme is described in Section 3 of the following paper:
P. Pilarczyk, Parallelization method for a continuous property, Foundations of Computational Mathematics, Vol. 10, No. 1 (2010), 93-114. DOI: 10.1007/s10208-009-9050-8.
Installation
The library comes in two flavors. Both are installed by default.
Header-only
There is a header-only version of the software, which can be used with the following include:
#include "cluster-delegator.hpp"
This is a single header file which contains the entire project. All functions are either templated or declared inline. This prevents duplicate symbol errors when linking together different translation units which both include this.
For backwards compatibility reasons, the header-only file will be copied in two locations, which are, assuming an install prefix of /usr/local
:
/usr/local/include/cluster-delegator.hpp # Include the one here
/usr/local/include/delegator/delegator.h # Not this deprecated one
Header and library
The second flavor is the header/library approach. To use this approach, include
#include "cluster-delegator.h"
and link against libcluster-delegator
. Both static and shared libraries are available.
The libraries should be in
/usr/local/lib/libcluster-delegator.a
/usr/local/lib/libcluster-delegator.so
Note
|
For Mac OS X, the .so will instead be .dylib .
|
Files
To recap, the installation will produce the following files:
/usr/local/include/cluster-delegator.hpp # single-file header-only library
/usr/local/include/delegator/delegator.h # single-file header-only library (deprecated)
/usr/local/include/cluster-delegator.h # declarations and template code only
/usr/local/lib/libcluster-delegator.a # static library
/usr/local/lib/libcluster-delegator.so # shared library
Dependencies
In order to use the software the following are required:
-
Boost 1.58 (compatibility of boost/serialization with C++11 types requires this late version)
-
OpenMPI
Mac OS X
-
Get home-brew (if you don’t already have it)
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
-
For the latest stable version, type
brew install shaunharker/tap/cluster-delegator
For the repo version,
-
Get open-mpi
brew install open-mpi
-
Get boost
brew install boost
-
Get cluster-delegator
git clone https://github.com/shaunharker/cluster-delegator.git
cd cluster-delegator
./install.sh
Either of these approaches will result in the software being installed into
/usr/local/include/
If another directory is required (e.g. you don’t have privileges) use
./install --prefix=/path/to/install/folder
but be sure to include this path in -I
directives when you compile your software which uses it.
Linux
For Linux, use a package manager, or else follow the directions on the OpenMPI and Boost websites to install the prerequisites. Then either download the tarball off Github (for the stable version) or else clone the repo. In either case, run
./install --prefix=/path/to/install/folder
whereupon the cluster-delegator.hpp
header file will be copied into
/path/to/install/include/
One might also refer to the .travis.yml
file, the scripts in the hidden .install
directory of the project, and the log of the latest Travis CI build for this project.
A common problem is for a user-installed Boost to either not be found or an out-of-date system install to interfer with it. This can be solved by setting the environment variable BOOST_ROOT
to be the path where the boost installation is (i.e. the path containing the include/boost
and lib
folders with the boost header files and boost library files). For example, in bash
export BOOST_ROOT=/path/to/boost/install
./install --prefix=/path/to/install/folder
Another problem which can occur is if installations of different implementations of MPI (i.e. open-mpi vs MPICH) are conflicting. For instance if CMake uses MPICH to link against but mpiexec
is the Open MPI flavor, then the behavior is undefined (it will probably hang).
If all else fails, write me at sharker@math.rutgers.edu.
The Scheme
In order to actually make something happen, the user needs to
override methods from Coordinator_Worker_Process
. To see how
this works, we’ll describe the behavior of the algorithm called by
Run<Process>()
:
-
Of the N processes, N-1 are workers and 1 is a coordinator.
-
The coordinator is responsible for preparing jobs and accepting results. This is accomplished via the methods
prepare
andaccept
. -
The workers are responsible for working jobs to produce results. This is accomplished via the method
work
. -
There are some initialization routines and a finalization routine. This is covered by the methods
command_line
,initialize
, andfinalize
.
In step-by-step detail:
-
One of the processes decides it is a coordinator, and all the others decide they are workers. The processes each execute the method
void Process::command_line ( int argc, char * argv [] );
to store/process the command line arguments.
-
The coordinator process runs (one time only) the method
void Process::initialize ( void );
-
The next three things happen in an interspersed fashion, on demand, as workers, jobs, and results become available:
-
The coordinator process calls the method
int Process::prepare ( Message & job_message )
in order to create
job_message
which will eventually be sent to a worker. -
A worker process calls the method
void Process::work ( Message & result_message, Message const& job_message ) const;
on
job_message
received from the coordinator in order to produceresult_message
which is sent back to the coordinator. -
The coordinator process calls the method
void Process::accept ( Message const& result_message )
on the
result_message
which was received from a worker.
-
-
The coordinator process runs (one time only) the method
void Process::finalize ( void );
Note that routines in step 3 will happen many times and be interspersed, but the routines in 1, 2, and 4 each happen only once and happen in the order they are listed.
Jobs, Results, and Messages
Jobs and Results in cluster-delegator are the messages passed between prepare
, work
, and accept
methods. They are objects of type class Message
. The Message
class provides a uniform method to "serialize" your data so it can be passed around by the processes. In fact, class Message
is really just a wrapper around the Boost serialization package.
The semantics of class Message
are as follows:
Message job_message; // now I have an empty message
int jobdata1;
char jobdata2;
std::vector < int > jobdata3;
std::unordered_map < int, float > jobdata4;
std::string jobdata5;
// code to produce data in jobdata variables
job_message << jobdata1;
job_message << jobdata2;
job_message << jobdata3;
job_message << jobdata4;
Later, this data may be extracted in precisely the same way, except we replace the << operators with >> operators. And you must extract the data in the same order it was inserted! (Not the reverse order, for example, or in a random order.)
Methods
There are six methods one writes in their Process
class:
-
command_line
: Set command line arguments so they are available toProcess
-
initialize
: Initialize in some fashion for what is to come -
prepare
: Come up with a job to send to a worker and return 0, OR realize there are no jobs left to send and return 1, OR stall for results before producing a job and return 2 -
work
: work a job and produce a result -
accept
: accept a result and handle it in some way -
finalize
: Finish up with whatever is suitable after all jobs have been prepared and accepted.
Only one of these processes has a return value, which is prepare
. It returns 0, 1, or 2, depending on the situation. For simple programs, it returns 0 while it is producing jobs, and then switches to returning 1 to indicate there are no jobs left. More sophisticated programs might make use of the "2 option". The "2 option" says "There will be more jobs, but not right now." This would be used in the situation where results need to be `accept`ed first to determine what ought to be computed next.
Default Implementation
The user-defined Process
class is actually derived from a base class called Coordinator_Worker_Process
: See the line from the "Hello World!" example above:
class Process : public Coordinator_Worker_Process {
The base class Coordinator_Worker_Process
actually defines all the methods. When the user defines them again in the derived class Process
, they override them with their own desired behaviors. However, the default behaviors in the base class are not entirely useless and can be used if one wants to simplify the program even further. To use the default behavior, one simply does not write the method definition in the derived class Process
. (If one is new to C, here is a description of C
class inheritance).)
Note
|
If the user overrides all the default methods, the material in the section is not important and can be ignored. We will repeat this note a few times! |
Base class member variables
The default implementations make use of some member variables belonging to the base class, which we now describe:
The default behavior of command_line
(i.e. the base class implementation) is to set base class member variables argc
and argv
to match the command line parameters handed to main.
/*----- Command line arguments -----*/
int argc;
char ** argv;
There are also a couple stacks used by the base class implementations of prepare
and accept
:
/*----- The Message Stacks ------*/
std::stack < Message > JOBS_TO_SEND;
std::stack < Message > RECEIVED_RESULTS;
JOBS_TO_SEND
is a stack which is intended to be populated by the user override of initialize
. The user would create jobs, make them into job messages, and push them onto the JOBS_TO_SEND
stack. The default implementation of the prepare
method pops a job message from JOBS_TO_SEND
and writes to its argument (which is then sent off to a worker.)
Note
|
It is not necessary to use JOBS_TO_SEND . This is for convenience only along with usage of the default prepare method.
|
RECEIVED_RESULTS
is a stack which is automatically populated by the default implementation of accept
. The default implementation of accept
takes its argument (which is a result message received from some worker) and pushes it onto the RECEIVED_RESULTS
stack. The user can then write an override for finalize in order to go through the RECEIVED_RESULTS
stack and handle the results however they wish.
Note
|
It is not necessary to use RECEIVED_RESULTS . This is for convenience only along with usage of the default accept method.
|
Base class default methods
Here we describe the default behaviors of the methods of Coordinator_Worker_Process
. These behaviors have been discussed already with respect to how they affect the member variables (in particular the message stacks), but we error on the side of redundancy and give them again here.
-
command_line:
: Sets member variables argc and argv to match the passed parameters -
initialize
: Does nothing by default -
prepare
: Checks if there is an item on member variable
std::stack < Message > JOBS_TO_SEND
if so, it pops that item from the stack and writes it to job_message
and returns code 0, which means "Produced a job." Otherwise, it returns code 1, which means "No jobs left to produce." It does not return code 2, which would mean "Waiting on information before jobs can be produced."
-
work
: Does nothing by default -
accept
pushes the message "result_message" onto the member variable
std::stack < Message > RECEIVED_RESULTS
-
finalize
: Does nothing by default
Note
|
It is not necessary to use any of the base class implementations. The user can override all of them. |
Examples
Skeleton Example
The simplest program using the software is as follows:
#include "cluster-delegator.hpp"
class Process : public Coordinator_Worker_Process {};
int main ( int argc, char * argv [] ) {
delegator::Start ();
delegator::Run < Process > (argc, argv);
delegator::Stop ();
}
This program creates no jobs, sends no jobs, works no jobs, creates no results, and stores no results. But it does initialize the delegator system. And it can easily be modified into a program that does do something, by filling out class Process by providing overrides of methods in Coordinator_Worker_Process. These methods are discussed below.
example1
Perhaps the easiest way to use the software is to only override initialize
, work
, and finalize
. This is what is done in
./examples/example1.cpp
Then we will have
-
initialize
: Invent jobs, turn them intojob_message
objects, and push them onto the member variable
std::stack < Message > JOBS_TO_SEND
-
finalize
: Read the result_messages off the member variable
std::stack < Message > RECEIVED_RESULTS
and extract the results and deal with them as necessary.
* work
: Extract the job from job_message
, do a computation on the job to produce a result,
create result_message from the result
example2
One does not have to use the default behavior of prepare
and accept
as done in example1
and may instead override them not to use the JOBS_TO_SEND
and RECEIVED_RESULTS
stacks. We provide an example somewhat more complicated than examples/helloworld.cpp
in the source code as examples/example2.cpp
.
Building MPI Programs
Building with CMake
To compile the examples, type (at the root of the distribution)
cd examples
../.install/build.sh
Note
|
This invokes the CMake build based on the ./examples/CMakeLists.txt file. You might find this file, as well as the files in the ./.install/ , useful for your own MPI builds.
|
Building by makefile
If you don’t like CMake, or are just curious to see another way, is illustrative to demonstrate how this can be done by hand:
cd examples
mpicxx -O3 -std=c++11 -I../include/ -c -o example1.o example1.cpp
mpicxx -O3 -std=c++11 -I../include/ -c -o example2.o example2.cpp
mpicxx -lboost_serialization example1.o -o example1
mpicxx -lboost_serialization example2.o -o example2
You could of course make a makefile that caused these instructions to be executed.
Running MPI Programs
Single Machine
To run the first example, type
mpiexec -np 8 ./example1
To run the second example, type
mpiexec -np 8 ./example2 42 is the answer
The number "8" is arbitrary; it’s the number of processes you’d like. If you have a double core system, probably "3" is a good choice (one of the processes will spend most of its time sleeping.) It’s OK to put more cores than you have — the OS can schedule them.
Cluster
Similar commands to the previous will probably work for clusters, though you probably need more command line options (e.g. hostfiles and such). But likely, you will probably be required by system administrators to use scheduling software so you can share the system. Unless you just happen to own your very own super-duper cluster.
There are three scheduling programs we will discuss: PBS, SGE, and SLURM.
PBS
This is accomplished by writing a PBS script.
Here is what such a script looks like:
#!/bin/bash
#PBS -l nodes=10:ppn=8
cd $PBS_O_WORKDIR
mpiexec ./my_program
In this simple example, we specified to use 10 nodes with 8 processors per node.
To submit the program to the cluster, one would type
ssh my_account@my.fancy.cluster.edu
# ... get it ready ...
qsub script.sh #submit the job!
You can periodically check the progress of your computation by typing
qstat
If something seems wrong, you can terminate your program with
qdel
followed by your job number (which you can see from qstat)
SGE
TODO
SLURM
TODO
History
-
2015-07-04. Releasing v2.0.
-
Updated documentation
-
Added support for return value of 2 from
prepare
(i.e. "no jobs for now, but maybe later") -
Switched to thread-based approach to handle communicator
-
Removed 128MB size limitation on messages
-
Introduced auto-sizing buffer
-
-
2014-08-13 — migrated to github
-
2011-08-24 — hosted on googlecode
-
Initial development was done in 2010.
Support
See the included examples, and you should be able to sort it out! If not, email me at sharker81@gmail.com.