Overview

Cluster-Delegator is a developer package to assist in writing C++ applications utilizing MPI on a cluster in a single-coordinator/many-worker (star topology) arrangement. The MPI interface is abstracted away. Instead, one constructs a program by writing methods for a class Process which define how to create jobs, how to work jobs to produce results, and how to process the results. The cluster-delegator machinery then takes care of all of the interprocess communication. Jobs and results are handled using the << and >> streaming operators which will package data structures (any data structure with a serialize method as described in the boost serialization manual) into a message class.

Here is the "Hello World!" program:

#include "cluster-delegator.hpp"
#include <string>
#include <vector>

class Process : public Coordinator_Worker_Process {
public:
  void command_line ( int argcin, char * argvin [] ) {
    std::cout << "This is called by every process at start!\n";
  }
  void initialize ( void ) {
    std::cout << "This is called by the coordinator before jobs start!\n";
    current_job_number = 0;
    names = { "Shaun", "Pawel", "Ippei", "World" };
    total_number_of_jobs = names . size ();
  }
  int prepare ( Message & job_message ) {
    std::cout << "This is called by the coordinator to produce jobs!\n";
    if ( current_job_number == total_number_of_jobs ) return 1; // Code 1: No more jobs!
    job_message << names [ current_job_number ++ ];   return 0; // Code 0: A job was sent!
  }
  void work ( Message & result_message, Message const& job_message ) const {
    std::cout << "This method is called by a worker receiving a job!\n";
    std::string name; job_message >> name;
    std::string greeting = "Hello " + name + "!";
    result_message << greeting;
  }
  void accept ( Message const& result_message ) {
    std::cout << "This is called by the coordinator to process a result!\n";
    std::string data; result_message >> data;
    std::cout << data << "\n";
  }
  void finalize ( void ) {
    std::cout << "This is called by the coordinator when all jobs are done!\n";
  }
private:
  // Each process has its own copy of the member variables below:
  int current_job_number;
  int total_number_of_jobs;
  std::vector<std::string> names;
};

int main ( int argc, char * argv [] ) {
  delegator::Start ();
  delegator::Run < Process > (argc, argv);
  delegator::Stop ();
  return 0;
}

Here is (one possible) output of this "Hello World!" program with a 3 process MPI job:

> mpiexec -np 3 ./bin/helloworld
This is called by every process at start!
This is called by every process at start!
This is called by the coordinator before jobs start!
This is called by every process at start!
This is called by the coordinator to produce jobs!
This is called by the coordinator to produce jobs!
This is called by the coordinator to produce jobs!
This is called by the coordinator to produce jobs!
This method is called by a worker receiving a job!
This method is called by a worker receiving a job!
This is called by the coordinator to process a result!
Hello Pawel!
This is called by the coordinator to process a result!
Hello Ippei!
This is called by the coordinator to produce jobs!
This method is called by a worker receiving a job!
This method is called by a worker receiving a job!
This is called by the coordinator to process a result!
Hello Shaun!
This is called by the coordinator to process a result!
Hello World!
This is called by the coordinator when all jobs are done!

Acknowledgements

This software was developed in group discussions between Shaun Harker, Pawel Pilarczyk, and Ippei Obayashi in 2010. The current thread-based implementation is due to Shaun Harker.

The Coordinator-Worker scheme is described in Section 3 of the following paper:

 P. Pilarczyk, Parallelization method for a continuous property,
 Foundations of Computational Mathematics, Vol. 10, No. 1 (2010),
 93-114. DOI: 10.1007/s10208-009-9050-8.

Installation

The library comes in two flavors. Both are installed by default.

Header-only

There is a header-only version of the software, which can be used with the following include:

#include "cluster-delegator.hpp"

This is a single header file which contains the entire project. All functions are either templated or declared inline. This prevents duplicate symbol errors when linking together different translation units which both include this.

For backwards compatibility reasons, the header-only file will be copied in two locations, which are, assuming an install prefix of /usr/local:

/usr/local/include/cluster-delegator.hpp  # Include the one here
/usr/local/include/delegator/delegator.h  # Not this deprecated one

Header and library

The second flavor is the header/library approach. To use this approach, include

#include "cluster-delegator.h"

and link against libcluster-delegator. Both static and shared libraries are available.

The libraries should be in

/usr/local/lib/libcluster-delegator.a
/usr/local/lib/libcluster-delegator.so
Note
For Mac OS X, the .so will instead be .dylib.

Files

To recap, the installation will produce the following files:

/usr/local/include/cluster-delegator.hpp  # single-file header-only library
/usr/local/include/delegator/delegator.h  # single-file header-only library (deprecated)
/usr/local/include/cluster-delegator.h    # declarations and template code only
/usr/local/lib/libcluster-delegator.a     # static library
/usr/local/lib/libcluster-delegator.so    # shared library

Dependencies

In order to use the software the following are required:

  • Boost 1.58 (compatibility of boost/serialization with C++11 types requires this late version)

  • OpenMPI

Mac OS X

  • Get home-brew (if you don’t already have it)

ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
  • For the latest stable version, type

brew install shaunharker/tap/cluster-delegator

For the repo version,

  • Get open-mpi

brew install open-mpi
  • Get boost

brew install boost
  • Get cluster-delegator

git clone https://github.com/shaunharker/cluster-delegator.git
cd cluster-delegator
./install.sh

Either of these approaches will result in the software being installed into

/usr/local/include/

If another directory is required (e.g. you don’t have privileges) use

./install --prefix=/path/to/install/folder

but be sure to include this path in -I directives when you compile your software which uses it.

Linux

For Linux, use a package manager, or else follow the directions on the OpenMPI and Boost websites to install the prerequisites. Then either download the tarball off Github (for the stable version) or else clone the repo. In either case, run

./install --prefix=/path/to/install/folder

whereupon the cluster-delegator.hpp header file will be copied into

/path/to/install/include/

One might also refer to the .travis.yml file, the scripts in the hidden .install directory of the project, and the log of the latest Travis CI build for this project.

A common problem is for a user-installed Boost to either not be found or an out-of-date system install to interfer with it. This can be solved by setting the environment variable BOOST_ROOT to be the path where the boost installation is (i.e. the path containing the include/boost and lib folders with the boost header files and boost library files). For example, in bash

export BOOST_ROOT=/path/to/boost/install
./install --prefix=/path/to/install/folder

Another problem which can occur is if installations of different implementations of MPI (i.e. open-mpi vs MPICH) are conflicting. For instance if CMake uses MPICH to link against but mpiexec is the Open MPI flavor, then the behavior is undefined (it will probably hang).

If all else fails, write me at sharker@math.rutgers.edu.

The Scheme

In order to actually make something happen, the user needs to override methods from Coordinator_Worker_Process. To see how this works, we’ll describe the behavior of the algorithm called by Run<Process>():

  • Of the N processes, N-1 are workers and 1 is a coordinator.

  • The coordinator is responsible for preparing jobs and accepting results. This is accomplished via the methods prepare and accept.

  • The workers are responsible for working jobs to produce results. This is accomplished via the method work.

  • There are some initialization routines and a finalization routine. This is covered by the methods command_line, initialize, and finalize.

In step-by-step detail:

  1. One of the processes decides it is a coordinator, and all the others decide they are workers. The processes each execute the method

    void Process::command_line ( int argc, char * argv [] );

    to store/process the command line arguments.

  2. The coordinator process runs (one time only) the method

    void Process::initialize ( void );
  3. The next three things happen in an interspersed fashion, on demand, as workers, jobs, and results become available:

    1. The coordinator process calls the method

      int Process::prepare ( Message & job_message )

      in order to create job_message which will eventually be sent to a worker.

    2. A worker process calls the method

      void Process::work ( Message & result_message,
                          Message const& job_message ) const;

      on job_message received from the coordinator in order to produce result_message which is sent back to the coordinator.

    3. The coordinator process calls the method

      void Process::accept ( Message const& result_message )

      on the result_message which was received from a worker.

  4. The coordinator process runs (one time only) the method

void Process::finalize ( void );

Note that routines in step 3 will happen many times and be interspersed, but the routines in 1, 2, and 4 each happen only once and happen in the order they are listed.

Jobs, Results, and Messages

Jobs and Results in cluster-delegator are the messages passed between prepare, work, and accept methods. They are objects of type class Message. The Message class provides a uniform method to "serialize" your data so it can be passed around by the processes. In fact, class Message is really just a wrapper around the Boost serialization package.

The semantics of class Message are as follows:

Message job_message;  // now I have an empty message
int jobdata1;
char jobdata2;
std::vector < int > jobdata3;
std::unordered_map < int, float > jobdata4;
std::string jobdata5;
// code to produce data in jobdata variables
job_message << jobdata1;
job_message << jobdata2;
job_message << jobdata3;
job_message << jobdata4;

Later, this data may be extracted in precisely the same way, except we replace the << operators with >> operators. And you must extract the data in the same order it was inserted! (Not the reverse order, for example, or in a random order.)

Methods

There are six methods one writes in their Process class:

  1. command_line: Set command line arguments so they are available to Process

  2. initialize: Initialize in some fashion for what is to come

  3. prepare: Come up with a job to send to a worker and return 0, OR realize there are no jobs left to send and return 1, OR stall for results before producing a job and return 2

  4. work: work a job and produce a result

  5. accept: accept a result and handle it in some way

  6. finalize: Finish up with whatever is suitable after all jobs have been prepared and accepted.

Only one of these processes has a return value, which is prepare. It returns 0, 1, or 2, depending on the situation. For simple programs, it returns 0 while it is producing jobs, and then switches to returning 1 to indicate there are no jobs left. More sophisticated programs might make use of the "2 option". The "2 option" says "There will be more jobs, but not right now." This would be used in the situation where results need to be `accept`ed first to determine what ought to be computed next.

Default Implementation

The user-defined Process class is actually derived from a base class called Coordinator_Worker_Process: See the line from the "Hello World!" example above:

class Process : public Coordinator_Worker_Process {

The base class Coordinator_Worker_Process actually defines all the methods. When the user defines them again in the derived class Process, they override them with their own desired behaviors. However, the default behaviors in the base class are not entirely useless and can be used if one wants to simplify the program even further. To use the default behavior, one simply does not write the method definition in the derived class Process. (If one is new to C, here is a description of C class inheritance).)

Note
If the user overrides all the default methods, the material in the section is not important and can be ignored. We will repeat this note a few times!

Base class member variables

The default implementations make use of some member variables belonging to the base class, which we now describe:

The default behavior of command_line (i.e. the base class implementation) is to set base class member variables argc and argv to match the command line parameters handed to main.

/*----- Command line arguments -----*/
int argc;
char ** argv;

There are also a couple stacks used by the base class implementations of prepare and accept:

/*----- The Message Stacks ------*/
std::stack < Message > JOBS_TO_SEND;
std::stack < Message > RECEIVED_RESULTS;

JOBS_TO_SEND is a stack which is intended to be populated by the user override of initialize. The user would create jobs, make them into job messages, and push them onto the JOBS_TO_SEND stack. The default implementation of the prepare method pops a job message from JOBS_TO_SEND and writes to its argument (which is then sent off to a worker.)

Note
It is not necessary to use JOBS_TO_SEND. This is for convenience only along with usage of the default prepare method.

RECEIVED_RESULTS is a stack which is automatically populated by the default implementation of accept. The default implementation of accept takes its argument (which is a result message received from some worker) and pushes it onto the RECEIVED_RESULTS stack. The user can then write an override for finalize in order to go through the RECEIVED_RESULTS stack and handle the results however they wish.

Note
It is not necessary to use RECEIVED_RESULTS. This is for convenience only along with usage of the default accept method.

Base class default methods

Here we describe the default behaviors of the methods of Coordinator_Worker_Process. These behaviors have been discussed already with respect to how they affect the member variables (in particular the message stacks), but we error on the side of redundancy and give them again here.

  • command_line:: Sets member variables argc and argv to match the passed parameters

  • initialize: Does nothing by default

  • prepare: Checks if there is an item on member variable

std::stack < Message > JOBS_TO_SEND

if so, it pops that item from the stack and writes it to job_message and returns code 0, which means "Produced a job." Otherwise, it returns code 1, which means "No jobs left to produce." It does not return code 2, which would mean "Waiting on information before jobs can be produced."

  • work: Does nothing by default

  • accept pushes the message "result_message" onto the member variable

std::stack < Message > RECEIVED_RESULTS
  • finalize: Does nothing by default

Note
It is not necessary to use any of the base class implementations. The user can override all of them.

Examples

Skeleton Example

The simplest program using the software is as follows:

#include "cluster-delegator.hpp"

class Process : public Coordinator_Worker_Process {};

int main ( int argc, char * argv [] ) {
  delegator::Start ();
  delegator::Run < Process > (argc, argv);
  delegator::Stop ();
}

This program creates no jobs, sends no jobs, works no jobs, creates no results, and stores no results. But it does initialize the delegator system. And it can easily be modified into a program that does do something, by filling out class Process by providing overrides of methods in Coordinator_Worker_Process. These methods are discussed below.

example1

Perhaps the easiest way to use the software is to only override initialize, work, and finalize. This is what is done in

./examples/example1.cpp

Then we will have

  • initialize: Invent jobs, turn them into job_message objects, and push them onto the member variable

std::stack < Message > JOBS_TO_SEND
  • finalize: Read the result_messages off the member variable

std::stack < Message > RECEIVED_RESULTS

and extract the results and deal with them as necessary. * work: Extract the job from job_message, do a computation on the job to produce a result, create result_message from the result

example2

One does not have to use the default behavior of prepare and accept as done in example1 and may instead override them not to use the JOBS_TO_SEND and RECEIVED_RESULTS stacks. We provide an example somewhat more complicated than examples/helloworld.cpp in the source code as examples/example2.cpp.

Building MPI Programs

Building with CMake

To compile the examples, type (at the root of the distribution)

cd examples
../.install/build.sh
Note
This invokes the CMake build based on the ./examples/CMakeLists.txt file. You might find this file, as well as the files in the ./.install/, useful for your own MPI builds.

Building by makefile

If you don’t like CMake, or are just curious to see another way, is illustrative to demonstrate how this can be done by hand:

cd examples
mpicxx -O3 -std=c++11 -I../include/ -c -o example1.o example1.cpp
mpicxx -O3 -std=c++11 -I../include/ -c -o example2.o example2.cpp
mpicxx -lboost_serialization example1.o -o example1
mpicxx -lboost_serialization example2.o -o example2

You could of course make a makefile that caused these instructions to be executed.

Running MPI Programs

Single Machine

To run the first example, type

 mpiexec -np 8 ./example1

To run the second example, type

 mpiexec -np 8 ./example2 42 is the answer

The number "8" is arbitrary; it’s the number of processes you’d like. If you have a double core system, probably "3" is a good choice (one of the processes will spend most of its time sleeping.) It’s OK to put more cores than you have — the OS can schedule them.

Cluster

Similar commands to the previous will probably work for clusters, though you probably need more command line options (e.g. hostfiles and such). But likely, you will probably be required by system administrators to use scheduling software so you can share the system. Unless you just happen to own your very own super-duper cluster.

There are three scheduling programs we will discuss: PBS, SGE, and SLURM.

PBS

This is accomplished by writing a PBS script.

Here is what such a script looks like:

#!/bin/bash
#PBS -l nodes=10:ppn=8
cd $PBS_O_WORKDIR
mpiexec ./my_program

In this simple example, we specified to use 10 nodes with 8 processors per node.

To submit the program to the cluster, one would type

ssh my_account@my.fancy.cluster.edu
# ... get it ready ...
qsub script.sh   #submit the job!

You can periodically check the progress of your computation by typing

qstat

If something seems wrong, you can terminate your program with

qdel

followed by your job number (which you can see from qstat)

SGE

TODO

SLURM

TODO

History

  • 2015-07-04. Releasing v2.0.

    • Updated documentation

    • Added support for return value of 2 from prepare (i.e. "no jobs for now, but maybe later")

    • Switched to thread-based approach to handle communicator

    • Removed 128MB size limitation on messages

    • Introduced auto-sizing buffer

  • 2014-08-13 — migrated to github

  • 2011-08-24 — hosted on googlecode

  • Initial development was done in 2010.

Support

See the included examples, and you should be able to sort it out! If not, email me at sharker81@gmail.com.