Back to Blog

MPI and Distributed Computing: Message Passing Across Thousands of Nodes

How the Message Passing Interface enables parallel programs to communicate across distributed-memory clusters using point-to-point and collective operations.

2025-07-22
Share
HPC & Clustersmpidistributed-computingmessage-passing

Terminology

Term Definition
MPI Message Passing Interface: a standardized specification for communication between processes running on distributed-memory systems
Rank A unique integer identifier (0 to $p-1$) assigned to each process in an MPI communicator, where $p$ is the total number of processes
Communicator A group of MPI processes that can communicate with each other; MPI_COMM_WORLD is the default communicator containing all processes
Point-to-point communication Direct message exchange between two specific processes using send and receive operations
Collective operation A communication pattern involving all processes in a communicator, such as broadcast, scatter, gather, or reduce
Broadcast A collective operation where one process (the root) sends the same data to all other processes in the communicator
Scatter A collective operation where the root process divides data into equal chunks and sends one chunk to each process
Gather A collective operation where each process sends data to the root, which assembles all chunks into a single buffer
Reduce A collective operation that combines data from all processes using an operator (sum, max, min) and stores the result at the root
All-reduce A reduce operation where the result is distributed to all processes, not just the root; equivalent to reduce followed by broadcast

What & Why

On a distributed-memory cluster, each node has its own private memory. Process 0 on Node A cannot read a variable stored by Process 1 on Node B. To cooperate on a parallel computation, processes must explicitly exchange data by sending and receiving messages over the network. MPI is the standard that defines how this communication works.

MPI matters because it is the universal language of HPC parallelism. First standardized in 1994, MPI provides a portable, efficient, and well-tested API for writing parallel programs that run on everything from a laptop (multiple processes on one machine) to the largest supercomputers (millions of cores across thousands of nodes). Nearly every large-scale scientific simulation, from climate models to molecular dynamics, is built on MPI.

The key insight behind MPI is that it separates the logical communication pattern from the physical hardware. A program written with MPI_Send and MPI_Recv works whether the processes are on the same machine (communicating through shared memory) or on different continents (communicating over a network). The MPI library handles the details of the underlying transport.

How It Works

The SPMD Model

MPI programs follow the Single Program, Multiple Data (SPMD) model. Every process runs the same executable, but each process has a unique rank that determines which portion of the data it works on and which code paths it takes. A typical MPI program starts by initializing MPI, querying its rank and the total number of processes, doing its work (with communication as needed), and finalizing MPI.

Point-to-Point Communication

The most basic MPI operations are Send and Receive. Process A calls Send to transmit a buffer of data to Process B. Process B calls Receive to accept that data. Both operations specify the source/destination rank, a tag (to distinguish different messages between the same pair), and the communicator.

Blocking send/receive: the call does not return until the operation is complete. Send returns when the buffer can be reused (data has been copied to a system buffer or transmitted). Receive returns when the data has arrived in the user's buffer.

Non-blocking send/receive: the call returns immediately, and the program can do other work while the communication happens in the background. The program must later call Wait or Test to confirm completion before using the buffer.

Collective Operations

Broadcast Rank 0: [A] R0: [A] R1: [A] R2: [A] Scatter Rank 0: [A,B,C] R0: [A] R1: [B] R2: [C] Gather R0: [A] R1: [B] R2: [C] Rank 0: [A,B,C] Reduce (SUM) R0: [3] R1: [5] R2: [7] Rank 0: [15] All-Reduce (SUM) R0: [3] R1: [5] R2: [7] R0: [15] R1: [15] R2: [15] All processes receive the same result

Collective operations involve all processes in a communicator and are the workhorses of parallel algorithms:

Broadcast: one root process sends the same data to every other process. Used to distribute configuration, parameters, or shared input data.

Scatter: the root divides an array into equal-sized chunks and sends one chunk to each process. Used to distribute work (e.g., each process gets a portion of a matrix).

Gather: the inverse of scatter. Each process sends its local result to the root, which assembles them into a single array. Used to collect results after parallel computation.

Reduce: each process contributes a value, and MPI combines them using an operator (SUM, MAX, MIN, etc.), storing the result at the root. Used for computing global sums, finding global maxima, etc.

All-reduce: like reduce, but the result is distributed to all processes. This is the most important collective for iterative algorithms (like gradient descent in distributed ML training), where every process needs the global result to proceed to the next iteration.

Communicators and Topology

MPI_COMM_WORLD is the default communicator containing all processes. Programs can create sub-communicators to partition processes into groups. For example, a 2D grid simulation might create row communicators and column communicators so that processes can do reductions along rows or columns independently. This maps the logical communication pattern to the problem structure.

Complexity Analysis

Communication cost for MPI operations depends on the number of processes $p$, message size $m$, network latency $L$, and bandwidth $B$.

Operation Time Complexity Algorithm
Send/Recv $O(L + m/B)$ Direct transfer
Broadcast $O(\log p \cdot L + m \cdot \frac{p-1}{p} \cdot \frac{1}{B})$ Binomial tree + pipelining
Scatter/Gather $O(\log p \cdot L + m \cdot \frac{p-1}{p} \cdot \frac{1}{B})$ Binomial tree
Reduce $O(\log p \cdot (L + m/B))$ Binomial tree
All-reduce $O(\log p \cdot L + 2 \cdot \frac{p-1}{p} \cdot \frac{m}{B})$ Ring or recursive halving-doubling

The broadcast uses a binomial tree: the root sends to one process, then both send to two more, then all four send to four more. After $\log_2 p$ steps, all processes have the data:

$T_{\text{broadcast}} = \lceil \log_2 p \rceil \cdot (L + m/B)$

For large messages, pipelining splits the message into segments and overlaps communication, achieving near-optimal bandwidth utilization:

$T_{\text{broadcast,pipelined}} \approx \log_2 p \cdot L + \frac{(p-1)}{p} \cdot \frac{m}{B}$

The ring all-reduce (used heavily in distributed deep learning) passes partial sums around a ring of $p$ processes in $2(p-1)$ steps, each transferring $m/p$ bytes:

$T_{\text{allreduce,ring}} = 2(p-1) \cdot \left(L + \frac{m}{p \cdot B}\right)$

This is bandwidth-optimal: total data moved is $2 \cdot \frac{p-1}{p} \cdot m$, which approaches $2m$ for large $p$.

Implementation

ALGORITHM MPIParallelSum(localData, comm)
INPUT: localData: array of numbers local to this process,
       comm: MPI communicator
OUTPUT: global sum available at root (rank 0)
BEGIN
  rank <- MPI_COMM_RANK(comm)
  size <- MPI_COMM_SIZE(comm)

  // Each process computes its local sum
  localSum <- 0
  FOR EACH value IN localData DO
    localSum <- localSum + value
  END FOR

  // Reduce all local sums to rank 0
  globalSum <- MPI_REDUCE(localSum, SUM, root=0, comm)

  IF rank = 0 THEN
    RETURN globalSum
  END IF
END

ALGORITHM MPIScatterCompute Gather(data, computeFunc, comm)
INPUT: data: full dataset (only valid at root),
       computeFunc: function to apply to each chunk,
       comm: MPI communicator
OUTPUT: combined results at root
BEGIN
  rank <- MPI_COMM_RANK(comm)
  size <- MPI_COMM_SIZE(comm)

  // Root scatters equal chunks to all processes
  chunkSize <- LENGTH(data) / size
  localChunk <- MPI_SCATTER(data, chunkSize, root=0, comm)

  // Each process computes on its chunk
  localResult <- computeFunc(localChunk)

  // Gather all results back to root
  allResults <- MPI_GATHER(localResult, root=0, comm)

  IF rank = 0 THEN
    RETURN allResults
  END IF
END

ALGORITHM RingAllReduce(localValue, op, comm)
INPUT: localValue: local data buffer of size m,
       op: reduction operator (SUM, MAX, etc.),
       comm: MPI communicator
OUTPUT: reduced result available at ALL processes
BEGIN
  rank <- MPI_COMM_RANK(comm)
  size <- MPI_COMM_SIZE(comm)
  chunkSize <- LENGTH(localValue) / size

  // Split local buffer into size chunks
  chunks <- SPLIT(localValue, size)
  result <- COPY(chunks)

  // Phase 1: Reduce-scatter (p-1 steps)
  // Each step: send one chunk to right neighbor, receive from left, reduce
  FOR step FROM 0 TO size - 2 DO
    sendIdx <- (rank - step) MOD size
    recvIdx <- (rank - step - 1) MOD size
    leftNeighbor <- (rank - 1 + size) MOD size
    rightNeighbor <- (rank + 1) MOD size

    SEND(result[sendIdx], TO rightNeighbor)
    received <- RECV(FROM leftNeighbor)
    result[recvIdx] <- APPLY(op, result[recvIdx], received)
  END FOR

  // Phase 2: All-gather (p-1 steps)
  // Each step: send reduced chunk to right, receive from left
  FOR step FROM 0 TO size - 2 DO
    sendIdx <- (rank - step + 1) MOD size
    recvIdx <- (rank - step) MOD size
    leftNeighbor <- (rank - 1 + size) MOD size
    rightNeighbor <- (rank + 1) MOD size

    SEND(result[sendIdx], TO rightNeighbor)
    result[recvIdx] <- RECV(FROM leftNeighbor)
  END FOR

  RETURN CONCATENATE(result)
END

ALGORITHM MPINonBlockingOverlap(localData, comm)
INPUT: localData: local computation data, comm: communicator
OUTPUT: final result with overlapped computation and communication
BEGIN
  rank <- MPI_COMM_RANK(comm)
  neighbor <- (rank + 1) MOD MPI_COMM_SIZE(comm)

  // Start non-blocking send of boundary data
  request <- MPI_ISEND(localData.boundary, TO neighbor, comm)

  // Do interior computation while communication happens
  interiorResult <- COMPUTE_INTERIOR(localData)

  // Wait for communication to complete
  MPI_WAIT(request)
  receivedBoundary <- MPI_RECV(FROM neighbor, comm)

  // Now compute boundary region with received data
  boundaryResult <- COMPUTE_BOUNDARY(interiorResult, receivedBoundary)

  RETURN MERGE(interiorResult, boundaryResult)
END

Real-World Applications

  • Climate and weather simulation: models like WRF and CESM decompose the atmosphere into a 3D grid, with each MPI process computing one sub-domain and exchanging boundary ("halo") cells with neighbors every time step
  • Molecular dynamics: LAMMPS and GROMACS use MPI to distribute atoms across processes, with each process computing forces on its local atoms and communicating with processes holding nearby atoms
  • Computational fluid dynamics: OpenFOAM decomposes meshes across MPI processes, using point-to-point communication for boundary exchange and collectives for convergence checks
  • Distributed deep learning: frameworks like Horovod use MPI all-reduce to synchronize gradients across GPU workers after each training batch, enabling data-parallel training across hundreds of GPUs
  • Astrophysics N-body simulations: codes like Gadget simulate billions of particles representing dark matter and gas, using MPI to distribute particles and tree-based force calculations across thousands of cores
  • Seismic imaging: oil and gas companies run wave-equation solvers on MPI clusters to process seismic survey data, with each process handling a portion of the subsurface model

Key Takeaways

  • MPI is the standard for programming distributed-memory parallel systems, providing portable communication primitives that work from laptops to the largest supercomputers
  • Point-to-point operations (Send/Recv) handle direct communication between two processes; non-blocking variants allow overlapping computation with communication
  • Collective operations (broadcast, scatter, gather, reduce, all-reduce) coordinate all processes in a communicator and are the building blocks of most parallel algorithms
  • All-reduce is the most critical collective for iterative algorithms; the ring algorithm achieves bandwidth-optimal cost of $O(2 \cdot \frac{p-1}{p} \cdot m/B)$
  • Broadcast uses a binomial tree to distribute data in $O(\log p)$ steps, and pipelining further improves bandwidth utilization for large messages
  • MPI programs follow the SPMD model: every process runs the same code but operates on different data based on its rank