Message Passing Interface
CS 441/641 Lecture, Dr. Lawlor
MPI is a standardized communication library that actually *is* a
standard: if you build a supercomputer, you put MPI on it.
Period. MPI is implemented as a library, with no compiler
support, which makes it more portable but less convenient than OpenMP.
First, read this basic MPI tutorial, which has a good survey of all the MPI routines. The gory details are in the MPI 1.1 standard,
which includes some examples and is fairly readable for a standard (that's not saying much: most standards are
hideously unreadable).
Pay particular attention to the "big eight" MPI
functions:
- MPI_Init and MPI_Finalize:
these set up and tear down MPI. MPI_Init sets up your
environment--lots of stuff, from command-line arguments to file I/O,
doesn't work right until you call MPI_Init, so call it before you do
anything. Also, you absolutely MUST call MPI_Finalize under every
possible exit path, or many MPI implementations won't kill the other
processes correctly, leaving zombies to prey on living processes.
Because of these dangers, every MPI main program should start with
*exactly* this code:
#include <mpi.h>
#include <stdlib.h> /* for atexit */
void my_exit_fn(void) {MPI_Finalize();}
int main(int argc,char *argv[])
{
MPI_Init(&argc,&argv);
atexit(my_exit_fn);
... now start actual work ...
}
- MPI_Comm_size and MPI_Comm_rank:
these return your process number (your "rank"), and the number of
processes (the "size") in a "communicator", which is almost always just
the whole machine, called MPI_COMM_WORLD. Here's how you get your
rank and size:
int rank,size;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
- MPI_Send and MPI_Recv:
these "point to point" functions just send bytes from one place to
another. They're the meat and potatoes of MPI. The
arguments give a contiguous list of data in memory, of a fixed length,
of a given data type,
being sent to a given destination rank, with any integer "tag" you like
(a tag of zero works fine), on a communicator (almost always
MPI_COMM_WORLD). MPI_Recv takes an "MPI_Status" pointer, where
MPI can stash the actual received message length and source
process. I remember the first six arguments for the send/receive
calls, MPI_Send(void* buffer, int count, MPI_Datatype datatype, int processor_dest, int tag, MPI_Comm comm); using the mnemonic "Bob Can't Do Peanuts with That Crap" (no offence to Bob).
- MPI_Bcast and MPI_Reduce:
these "collective" functions broadcast data from one processor out to
every processor, or reduce (sum up) data from all processors to one "master"
processor.
Yes, you could just build these yourself by doing repeated sends (for
bcast) or receives (for reduce), but these implementations are usually
designed to scale well: for example, they use a hardware broadcast, or
a tree.
Those are really
the only functions you need to learn in MPI 1.1, the rest are just small variations on
those themes.
For example, here's an idiomatic MPI program where the first process sends one integer to the last process:
#include <mpi.h> /* for MPI_ functions */
#include <stdio.h> /* for printf */
#include <stdlib.h> /* for atexit */
void call_finalize(void) {MPI_Finalize();}
int main(int argc,char *argv[]) {
MPI_Init(&argc,&argv);
atexit(call_finalize); /*<- important to avoid weird errors! */
int rank=0,size=1;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
int tag=17; /*<- random integer ID for this message exchange */
if (rank==0) {
int val=1234;
MPI_Send(&val,1,MPI_INT, size-1,tag,MPI_COMM_WORLD);
printf("Rank %d sent value %d\n",rank,val);
}
if (rank==size-1) {
MPI_Status sts;
int val=0;
MPI_Recv(&val,1,MPI_INT, MPI_ANY_SOURCE,tag,MPI_COMM_WORLD,&sts);
printf("Rank %d received value %d\n",rank,val);
}
return 0;
}
(Try this in NetRun now!)
Here's a simple parameter sweep program:
#include <mpi.h>
#include <vector>
extern "C" void call_mpi_finalize(void) {MPI_Finalize();}
int main(int argc,char **argv)
{
MPI_Init(&argc,&argv);
atexit(call_mpi_finalize); // call MPI_Finalize before exiting
int rank=0, size=1; // my processor, n processors
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
std::cout<<"I am rank "<<rank<<" of "<<size<<"\n";
// Do local work
int n=1000000; // problem size across whole machine
int first=rank*n/size; // start of my piece of the problem
int last=(rank+1)*n/size;
std::vector<int> good;
for (int i=first;i<last;i++) {
if ((i%17==0) && (i%451==13)) good.push_back(i);
}
// Sum local pieces across processors
int total=0;
int mygood=good.size();
MPI_Reduce(&mygood,&total,1,MPI_INT, MPI_SUM, 0,MPI_COMM_WORLD);
// rank 0 now has the total, so send it out to everybody
MPI_Bcast(&total,1,MPI_INT, 0,MPI_COMM_WORLD);
std::cout<<"Total n good: "<<total<<"\n";
return 0;
}
(Try this in NetRun now!)
Here's a simple MPI integration program:
#include "mpi.h"
#include <math.h>
int main(int argc,char *argv[]) {
MPI_Init(&argc,&argv); /* <- *sets* argc and argv! */
int rank=0, size=1;
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
if (rank==0) std::cout<<"Started up: I am "<<rank<<" of "<<size<<"\n";
double start=MPI_Wtime();
int total=10000000; // iterations for all processors
int per=total/size; // iterations per processor
int first=per*rank; // my first iteration
int last=first+per; // my last iteration
double t=0.0;
for (int i=first;i<last;i++) t+=sin(log(1.0+i));
// Sum up "t" across all processors
double sumt=0.0;
MPI_Reduce(&t,&sumt,1,MPI_DOUBLE,MPI_SUM,0, MPI_COMM_WORLD);
if (rank==0) {
double elapsed=MPI_Wtime()-start;
std::cout<<"t="<<sumt<<" in "<<elapsed<<"s on "<<size<<" cpus\n";
}
MPI_Finalize();
return 0;
}
(Try this in NetRun now!)
Here's a more complex program that renders parts of the mandelbrot set on each MPI process, and assembles the pieces on rank 0:
/*
Mandelbrot renderer in MPI
Dr. Orion Lawlor, 2010-11-30 (Public Domain)
*/
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <complex>
/**
A linear function in 2 dimensions: returns a double as a function of (x,y).
*/
class linear2d_function {
public:
double a,b,c;
void set(double a_,double b_,double c_) {a=a_;b=b_;c=c_;}
linear2d_function(double a_,double b_,double c_) {set(a_,b_,c_);}
double evaluate(double x,double y) const {return x*a+y*b+c;}
};
const int wid=1000, ht=1000;
// Set up coordinate system to render the Mandelbrot Set:
double scale=3.0/wid;
linear2d_function fx(scale,0.0,-1.0); // returns c given pixels
linear2d_function fy(0.0,scale,-1.0);
char render_mset(int x,int y) {
/* Walk this Mandelbrot Set pixel */
typedef std::complex<double> COMPLEX;
COMPLEX c(fx.evaluate(x,y),fy.evaluate(x,y));
COMPLEX z(0.0);
int count;
enum {max_count=256};
for (count=0;count<max_count;count++) {
z=z*z+c;
if ((z.real()*z.real()+z.imag()*z.imag())>4.0) break;
}
return count;
}
class row {
public:
char data[wid];
};
int main(int argc,char *argv[]) {
/* MPI's args, MPI's random working directory */
MPI_Init(&argc,&argv);
/* Your command line args, your working directory */
int size,rank;
MPI_Comm_size(MPI_COMM_WORLD,&size);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
std::cout<<"I am "<<rank<<" of "<<size<<"\n";
int procpiece=ht/size; int gystart=rank*procpiece;
row limg[procpiece]; /* local copy of the final image */
double start=MPI_Wtime();
/* Render our piece of the image */
for (int y=0;y<procpiece;y++)
{
for (int x=0;x<wid;x++) limg[y].data[x]=render_mset(x,gystart+y);
}
double elapsed_compute=MPI_Wtime()-start;
int tag=12378;
if (rank>0)
{ /* send our partial piece to rank 0 */
//skt_sendN(s[0],&limg[0].data[0],sizeof(row)*procpiece);
MPI_Send(&limg[0].data[0],sizeof(row)*procpiece,MPI_CHAR,
0,tag,MPI_COMM_WORLD);
}
else
{ /* rank 0: receive partial pieces from other ranks */
row gimg[ht];
for (int r=0;r<size;r++)
if (r==0) {
memcpy(gimg,limg,sizeof(row)*procpiece);
} else {
//skt_recvN(s[r],&gimg[r*procpiece].data[0],
// sizeof(row)*procpiece);
MPI_Status status;
MPI_Recv(&gimg[r*procpiece].data[0],sizeof(row)*procpiece,MPI_CHAR,
r,tag,MPI_COMM_WORLD,&status);
}
/* Print out assembled image */
std::ofstream of("out.ppm",std::ios_base::binary);
of<<"P5\n"; // greyscale, binary
of<<wid<<" "<<ht<<"\n"; // image size
of<<"255\n"; // byte image
of.write(&gimg[0].data[0],sizeof(row)*ht);
}
double elapsed_send=MPI_Wtime()-start;
std::cout<<"Rank "<<rank<<": "<<1000.0*elapsed_compute<<"ms compute, "<<
1000.0*elapsed_send<<"ms total\n";
MPI_Finalize();
return 0;
}
(Try this in NetRun now!)
MPI Performance on the Powerwall
The UAF Bioinformatics Powerwall
is a fairly typical 2006-era cluster, with gigabit ethernet connecting
ten dual-core nodes. It's running OpenMPI 1.3. Here's how it performs:
Test send/recv_int took 47.113us per iteration (10000 iterations)
Sending even a small, one-int message takes a *long* time (high alpha
cost; about 50 us). This is mostly a function of your network
hardware.
Test sandbag (100) took 49.522us per iteration (1000 iterations)
The "sandbag" is my function that does CPU work.
Test send/recv_int + sandbag(100) took 96.148us per iteration (10000 iterations)
If we both compute, and communicate, the time should add close to
linearly if the CPU and network are not running at the same time.
Test isend/irecv_int + sandbag(100) took 53.740us per iteration (100 iterations)
For better communication performance, use Isend and Irecv, which are "nonblocking": the CPU can keep working while the network talks.
Test send/recv_zero length took 46.548us per iteration (10000 iterations)
Test send/recv_1k took 72.320us per iteration (100 iterations)
Test send/recv_1meg took 9169.580us per iteration (100 iterations)
Short messages, even zero length, are expensive due to alpha
cost. A 1-kilobyte message costs only 30% more than a one-byte
message! Communication costs under 10ns/byte for long messages,
since the startup overhead amortizes away. OpenMPI really can
deliver over 100MB/sec on gigabit ethernet.
Test send/recv_int_overlap took 2.792us per iteration (1000 iterations)
Here's another curiousity--if I do repeated one-int sends to the same
destination, MPI is smart enough to start bundling the ints together
into longer messages. Of course, it's still far more expensive
than just bundling them yourself.
Test send/recv_1meg + sandbag(10000) took 14111.791us per iteration (100 iterations)
Test isend/irecv_1meg + sandbag(10000) took 14018.261us per iteration (100 iterations)
However, MPI doesn't seem to overlap long messages very well--this is
clearly first compute, then communicate. (I might need to add an
"MPI_Test" call or something inside my sandbag function.) Isend doesn't help much here either, for some reason.
(2 cpus) Test barrier took 54.131us per iteration (1000 iterations)
(10 cpus)Test barrier took 242.433us per iteration (1000 iterations)
A barrier requires only one synchronizing message on two CPUs.
(2 cpus) Test bcast_int + barrier took 100.409us per iteration (1000 iterations)
(10 cpus)Test bcast_int + barrier took 390.508us per iteration (1000 iterations)
A broadcast costs about the same as one network message on two
processors. On more processors, it's more expensive, but not
tenfold more like the naive "process zero sends once to everybody"
algorithm. On modern hardware, broadcast is very fast.
(2 cpus) Test reduce_int + barrier took 100.939us per iteration (1000 iterations)
(10 cpus)Test reduce_int + barrier took 328.141us per iteration (1000 iterations)
Similarly, a reduction costs basically one message (on two processors).