MPI: Message Passing Interface

MPI is a programming model for parallel and distributed computing. It transfers data between processing units without requiring detailed manual setup. For example, with 4 machines each having 16 CPU cores, there are 64 cores in total. MPI can utilize all 64 cores uniformly without explicit configuration.

Note that there is a significant performance difference depending on where the data transfer occurs. Transfers between cores on the same machine are relatively fast. Transfers between cores on different machines are relatively slow.

Following functions can be used for MPI programming. This post will be updated later with more information.

Send

1
int MPI_Send(void *smessage, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)

Sends the message in the smessage buffer with a count of count elements. The data type is specified by datatype. The rank of the destination processor is given by dest. An optional tag can be used to identify and validate messages. The communicator is passed via comm.

Receive

1
int MPI_Recv(void *rmessage, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status)

Receives a message into the rmessage buffer with a count of count elements. The data type is specified by datatype. The rank of the source processor is given by source. An optional tag can be used to filter and validate messages. The communicator is passed via comm. The status of the received message is returned in status.

Tag

You may use MPI_ANY_SOURCE or MPI_ANY_TAG to receive a message from any source or with any tag.

Deadlock

MPI can cause deadlocks. If every processor waits for another, execution halts indefinitely.

MPI Send Variants

MPI supports several variants of MPI_Send. In fact, the standard MPI_Send does not guarantee whether it is synchronous or buffered. The following variants give explicit control: MPI_Bsend(), MPI_Rsend(), MPI_Ssend(), MPI_Isend().

  1. MPI_Bsend() : (Buffered mode) Copy data to the buffer and return.
  2. MPI_Rsend() : (Ready) Wait untill opponent’s recv() called then return.
  3. MPI_Ssend() : (Synchronous) Wait untill opponent’s recv() finished then return.
  4. MPI_Send() : (Base) Nothing guaranteed, depending on situation
  5. MPI_Isend() : (Asynchronous) Non-blocking send. Making a sender thread and make a ticket to check it.

Asynchronous send

1
int MPI_Isend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request)

It’s almost the same with MPI_Send() but it uses request as a ticket to check whether it has been done or not.

Asynchronous receive

1
int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request)

Like send operation, there is a asynchronous recv operation either. It’s almost the same with MPI_Recv() but it uses request as a ticket to check whether it has been done or not.

Wait

1
int MPI_Wait(MPI_Request *request, MPI_Status *status) 

Now, there is a wait function to wait for asynchronous operation ends. It uses ticket that we receives from asynchronous operations.

Wait all

1
int MPI_Waitall(int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[])

It has a generalized version of MPI_Wait. It supports to check multiple number of tickets at once.

Test

1
int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status)

A non-blocking version of MPI_Wait. It returns immediately and sets flag to indicate whether the operation has completed.

Testall

1
int MPI_Testall(int count, MPI_Request array_of_requests[], int *flag, MPI_Status array_of_statuses[])

A generalized version of MPI_Test for multiple requests. Note that it uses a single flag, so it succeeds only if all requests have completed.

Example

MPI example is like follow.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <mpi.h>

int main(int argc, char* argv[]) {
    //Initialize the MPI
    MPI_Init(&argc, &argv); 

    //Get number of threads
    int num_ranks;
    MPI_Comm_size(MPI_COMM_WORLD, &num_ranks); 

    //Get current program index
    int rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    //Get processor name
    int len; 
    char hostname[MPI_MAX_PROCESSOR_NAME]; 
    MPI_Get_processor_name(hostname, &len);

    printf("hostname: %s, I am %d/%d\n", hostname, rank, num_ranks);

    //Preparing buffers
    constexpr int MSGLEN = 30;
    char *recvbuf = new char[MSGLEN];
    char *sendmsg = new char[MSGLEN];

    //How many time to repeat
    constexpr int REPEAT = 10;
    
    //Each of them will pass index of this iteration as tag
    for(int i=0; i<REPEAT; i++) { 
        if(rank % 2==0) {//If it's even rank, it sends a message
            sprintf(sendmsg, "hello from %s_%d #%d",hostname, rank, i); //Prepare the message
            MPI_Send(sendmsg, MSGLEN, MPI_CHAR, rank + 1, i, MPI_COMM_WORLD);
        } 
        else {//If it's odd rank, it receives a message
            MPI_Status status; 
            MPI_Recv(recvbuf, MSGLEN, MPI_CHAR, rank  1, i, MPI_COMM_WORLD, &status); 
            printf("#%d:recved %lld bytes : %s\n", rank, MSGLEN, recvbuf);//Received message
        }
    }
    //Notice that this will work with only even number of worker.

    //Terminates the MPI
    MPI_Finalize();
    return 0;
}

MPI provides built-in support for common collective communication patterns.

Broadcast

1
int MPI_Bcast(void *message, int count, MPI_Datatype type, int root, MPI_Comm comm) 
  Worker 1 Worker 2 Worker 3 Worker 4
Before A      
After A A A A

Broadcast is the simplest way to send data to all processors. It sends the message at message to every other processor. It can be thought of as a synchronization point for all processors.

Reduce

1
int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_Op op, int root, MPI_Comm comm)

It’s a reduce function for all operations. It’s like accumulating datas with op. For example if op is MPI_SUM then, it will sum all the data in sendbuf. Followings are possible op types.

  1. MPI_MAX : max operation
  2. MPI_MIN : min operation
  3. MPI_MAXLOC : max operation with indexing
  4. MPI_MINLOC : min operation with indexing
  5. MPI_SUM : sum operation
  6. MPI_PROD : product operation
  7. MPI_LAND : logical and operation
  8. MPI_BAND : bit-wise and operation
  9. MPI_LOR : logical or operation
  10. MPI_BOR : bit-wise or operation
  11. MPI_LXOR : logical xor operation
  12. MPI_BXOR : bit-wise xor operation
  Worker 1 Worker 2 Worker 3 Worker 4
Before A B C D
After A op B op C op D B C D

Scatter

1
int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

Similar to broadcast, but instead of sending the same data to all processors, it splits the data and distributes each chunk to one processor.

  Worker 1 Worker 2 Worker 3 Worker 4
Before A,B,C,D      
After A B C D

Gather

1
int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

Similar to reduce, but it simply collects data from all processors without applying any operation.

  Worker 1 Worker 2 Worker 3 Worker 4
Before A B C D
After A,B,C,D      

All Gather

1
int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)

Equivalent to a Gather followed by a Scatter: collects all data and distributes the full dataset to every processor.

  Worker 1 Worker 2 Worker 3 Worker 4
Before A B C D
After A,B,C,D A,B,C,D A,B,C,D A,B,C,D

Reduce Scatter

Equivalent to performing a reduce and a scatter simultaneously. Each processor contributes data, and each receives the result of the reduction for its portion.

1
int MPI_Reduce_scatter(void *sendbuf, void *recvbuff, int *revcnt, MPI_Datatype type, MPI_Op op, MPI_Comm comm)
  Worker 1 Worker 2 Worker 3 Worker 4
Before $A_1,B_1,C_1,D_1$ $A_2,B_2,C_2,D_2$ $A_3,B_3,C_3,D_3$ $A_4,B_4,C_4,D_4$
After $A_1$ op $A_2$ op $A_3$ op $A_4$ $B_1$ op $B_2$ op $B_3$ op $B_4$ $C_1$ op $C_2$ op $C_3$ op $C_4$ $D_1$ op $D_2$ op $D_3$ op $D_4$

All Reduce

Equivalent to performing a reduce and broadcasting the result to all processors simultaneously.

1
int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
  Worker 1 Worker 2 Worker 3 Worker 4
Before $A_1,B_1,C_1,D_1$ $A_2,B_2,C_2,D_2$ $A_3,B_3,C_3,D_3$ $A_4,B_4,C_4,D_4$
After $A_1$ op $A_2$ op $A_3$ op $A_4$, $B_1$ op $B_2$ op $B_3$ op $B_4$, $C_1$ op $C_2$ op $C_3$ op $C_4$, $D_1$ op $D_2$ op $D_3$ op $D_4$ $A_1$ op $A_2$ op $A_3$ op $A_4$, $B_1$ op $B_2$ op $B_3$ op $B_4$, $C_1$ op $C_2$ op $C_3$ op $C_4$, $D_1$ op $D_2$ op $D_3$ op $D_4$ $A_1$ op $A_2$ op $A_3$ op $A_4$, $B_1$ op $B_2$ op $B_3$ op $B_4$, $C_1$ op $C_2$ op $C_3$ op $C_4$, $D_1$ op $D_2$ op $D_3$ op $D_4$ $A_1$ op $A_2$ op $A_3$ op $A_4$, $B_1$ op $B_2$ op $B_3$ op $B_4$, $C_1$ op $C_2$ op $C_3$ op $C_4$, $D_1$ op $D_2$ op $D_3$ op $D_4$

All to All

Equivalent to transposing the data matrix across processors.

1
int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
  Worker 1 Worker 2 Worker 3 Worker 4
Before $A_1,B_1,C_1,D_1$ $A_2,B_2,C_2,D_2$ $A_3,B_3,C_3,D_3$ $A_4,B_4,C_4,D_4$
After $A_1,A_2,A_3,A_4$ $B_1,B_2,B_3,B_4$ $C_1,C_2,C_3,C_4$ $D_1,D_2,D_3,D_4$

Other extension for functions

Note that all functions above have asynchronous variants (prefix I, e.g., Igather) and variable-size variants (suffix v, e.g., Alltoallv). Variable-size variants allow each processor to send or receive different amounts of data.

Communicator Groups

By default, all processors participate in collective operations via MPI_COMM_WORLD. However, you can create sub-groups or custom communicators in code. An example is shown below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MPI_Group world_group; 
MPI_Comm_group(MPI_COMM_WORLD, &world_group); 
MPI_Group mpi_local_group;
int num_participants=3;
int participants[3] = {0,2,4}; 
MPI_Group_incl(world_group, num_participants, participants, &mpi_local_group);
MPI_Comm new_comm; 
MPI_Comm_create_group(MPI_COMM_WORLD, mpi_local_group, 0, &new_comm); 
if(new_comm != MPI_COMM_NULL) { 
    int local_rank;
    MPI_Comm_rank(new_comm, &local_rank); 
    printf("%d:local :%d\n", rank, local_rank ); 
    MPI_Bcast(buf, MSGLEN, MPI_INT, 0, new_comm);
}