MPIを活用した並列GEMMの実装パターンとパフォーマンス特性

分散環境における行列積のデータ配置戦略

大規模行列の一般行列積(GEMM)計算において、逐次実装の計算量O(M×P×N)はプロセッサ数が線形増加しても通信オーバヘッドによってスケールしない。分散メモリシステムでは、演算対象行列をプロセスグリッド上に分配し、通信と並列計算を重叠させるアーキテクチャ設計が必須となる。ここでは1次元ブロック循環型と2次元直交グリッド型の2つの代表的な実装パターンを示す。

手法1:1次元宇宙分割とリングシフト法の実装

共有軸(K軸)に対してプロセス数だけブロック分割を行う。各行列のローカルブロックを準備後、演算毎に隣接プロセッサへデータを送信し循環させる。この手法では散在するメモリ領域を対象に通信を行うため、MPIカスタムデータ型(Vector+Resized)を使用して連続したバイトストリームとして送信する。

#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "mpi.h"
#include "InitMatrix.h"
#include "MatrixMultiply.h"
#include "PrintMatrix.h"

#define DIM_M 6
#define DIM_K 6
#define DIM_N 6

static void dump_block(const float *data, int count) {
    for(int i = 0; i < count; ++i) printf("%.3f\t", data[i]);
    printf("\n");
}

int main(int argc, char *argv[]) {
    int proc_id, total_procs;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &proc_id);
    MPI_Comm_size(MPI_COMM_WORLD, &total_procs);

    /* カスタムデータ型定義:B行列の縦断ち切りブロックを表現 */
    MPI_Datatype chunk_type, original_block;
    MPI_Type_vector(DIM_M / total_procs, DIM_K, DIM_N, MPI_FLOAT, &original_block);
    MPI_Type_create_resized(original_block, 0, sizeof(float), &chunk_type);
    MPI_Type_commit(&chunk_type);

    float *global_A = NULL, *global_B = NULL, *global_C = NULL;
    float *local_A   = NULL, *local_B   = NULL, *local_C   = NULL, *work_buf = NULL;

    if(proc_id == 0) {
        global_A = malloc(DIM_M * DIM_K * sizeof(float));
        global_B = malloc(DIM_K * DIM_N * sizeof(float));
        global_C = malloc(DIM_M * DIM_N * sizeof(float));
        memset(global_C, 0, DIM_M * DIM_N * sizeof(float));
        InitMatrixA(global_A, DIM_M, DIM_K);
        InitMatrixB(global_B, DIM_K, DIM_N);
    }

    local_A   = malloc((DIM_M * DIM_K) / total_procs * sizeof(float));
    local_B   = malloc((DIM_K * DIM_N) / total_procs * sizeof(float));
    local_C   = malloc((DIM_M * DIM_N) / total_procs * sizeof(float));
    work_buf  = malloc((DIM_M * DIM_N) / total_procs * sizeof(float));
    memset(local_C, 0, (DIM_M * DIM_N) / total_procs * sizeof(float));

    /* Scatterv用のメタデータ初期化 */
    int recv_counts[total_procs], displacements[total_procs];
    for(int i = 0; i < total_procs; ++i) {
        displacements[i] = i * (DIM_K / total_procs);
        recv_counts[i] = 1;
    }

    /* データの一次元分散 */
    MPI_Scatter(global_A, (DIM_M * DIM_K) / total_procs, MPI_FLOAT, local_A, (DIM_M * DIM_K) / total_procs, MPI_FLOAT, 0, MPI_COMM_WORLD);
    MPI_Scatterv(global_B, recv_counts, displacements, chunk_type, local_B, (DIM_K * DIM_N) / total_procs, MPI_FLOAT, 0, MPI_COMM_WORLD);

    int prev_rank = (proc_id > 0) ? (proc_id - 1) : (total_procs - 1);
    int next_rank = (proc_id < total_procs - 1) ? (proc_id + 1) : 0;
    MPI_Status stat_info;

    double t_start = MPI_Wtime();
    for(int phase = 0; phase < total_procs; ++phase) {
        /* ローカル行列積の実行 */
        MatrixMultiply(local_A, local_B, work_buf, DIM_M / total_procs, DIM_K, DIM_N / total_procs);

        /* 結果を論理座標に応じて蓄積 */
        int target_offset = (proc_id + phase) % total_procs;
        for(int r = 0; r < DIM_M / total_procs; ++r) {
            for(int c = 0; c < DIM_N / total_procs; ++c) {
                local_C[target_offset * (DIM_N / total_procs) + c] += work_buf[r * (DIM_N / total_procs) + c];
            }
        }

        /* B行列を逆方向へ循環転送(次に必要なブロックを取り込む) */
        MPI_Sendrecv_replace(local_B, (DIM_K * DIM_N) / total_procs, MPI_FLOAT, prev_rank, 1, next_rank, 1, MPI_COMM_WORLD, &stat_info);
    }
    double t_end = MPI_Wtime();

    /* 結果を集約 */
    MPI_Gatherv(local_C, (DIM_M * DIM_N) / total_procs, MPI_FLOAT, global_C, recv_counts, displacements, chunk_type, 0, MPI_COMM_WORLD);

    if(proc_id == 0) {
        printf("分散GEMM実行時間: %.6f sec\n", t_end - t_start);
        PrintMatrix(global_C, DIM_M, DIM_N);
        free(global_A); free(global_B); free(global_C);
    }
    free(local_A); free(local_B); free(local_C); free(work_buf);
    MPI_Type_free(&chunk_type);
    MPI_Finalize();
    return 0;
}

手法2:Cannonアルゴリズムによる2次元グリッド展開

プロセス群を2次元正方トポロジ上に配置し、行列AとBをそれぞれ行と列方向に循環シフトさせることでデータ局所性を高める。各ステップで自身を持つブロックスカラー積を実行し、移動と平行して進行する。この方式は通信経路が予測可能であり、バースト通信を抑止できる利点がある。

#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <math.h>
#include "mpi.h"
#include "InitMatrix.h"
#include "MatrixMultiply.h"
#include "PrintMatrix.h"

#define BLK_M 4
#define BLK_K 4
#define BLK_N 4

int main(int argc, char *argv[]) {
    int rank, size;
    MPI_Comm cart_grid;
    MPI_Status req_stat;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    /* 2次元カートルート生成(正方形グリッド想定) */
    int grid_dim = (int)sqrt(size);
    int dims[2] = {grid_dim, grid_dim};
    int periods[2] = {1, 1};
    MPI_Cart_create(MPI_COMM_WORLD, 2, dims, periods, 0, &cart_grid);

    int dir_left, dir_right, dir_up, dir_down;
    MPI_Cart_shift(cart_grid, 0, 1, &dir_left, &dir_right);
    MPI_Cart_shift(cart_grid, 1, 1, &dir_up, &dir_down);

    /* ブロック単位のカスタムデータ型 */
    MPI_Datatype blk_type, raw_slice;
    MPI_Type_vector(BLK_N/grid_dim, BLK_N/grid_dim, BLK_N, MPI_FLOAT, &raw_slice);
    MPI_Type_create_resized(raw_slice, 0, sizeof(float), &blk_type);
    MPI_Type_commit(&blk_type);

    float *gA = NULL, *gB = NULL, *gC = NULL;
    float *lA = NULL, *lB = NULL, *lC = NULL;

    if(rank == 0) {
        gA = malloc(BLK_M * BLK_K * sizeof(float));
        gB = malloc(BLK_K * BLK_N * sizeof(float));
        gC = malloc(BLK_M * BLK_N * sizeof(float));
        InitMatrixA(gA, BLK_M, BLK_K);
        InitMatrixB(gB, BLK_K, BLK_N);
    }

    lA = malloc((BLK_M * BLK_K) / size * sizeof(float));
    lB = malloc((BLK_K * BLK_N) / size * sizeof(float));
    lC = malloc((BLK_M * BLK_N) / size * sizeof(float));
    memset(lC, 0, (BLK_M * BLK_N) / size * sizeof(float));

    /* 分散配置用のオフセットテーブル */
    int scnt[size], sdisp[size];
    for(int i=0; i<grid_dim; ++i) {
        for(int j=0; j<grid_dim; ++j) {
            int idx = i * grid_dim + j;
            sdisp[idx] = i * (BLK_N * BLK_M) / grid_dim + j * (BLK_M / grid_dim);
            scnt[idx] = 1;
        }
    }

    MPI_Scatterv(gA, scnt, sdisp, blk_type, lA, (BLK_M * BLK_K)/size, MPI_FLOAT, 0, MPI_COMM_WORLD);
    MPI_Scatterv(gB, scnt, sdisp, blk_type, lB, (BLK_K * BLK_N)/size, MPI_FLOAT, 0, MPI_COMM_WORLD);

    double t_init = MPI_Wtime();
    for(int step = 0; step < grid_dim; ++step) {
        /* 固定サイズのブロック積計算 */
        MatrixMultiply(lA, lB, lC, BLK_M/grid_dim, BLK_K/grid_dim, BLK_N/grid_dim);

        /* 正方グリッド上でのデータ巡回 */
        MPI_Sendrecv_replace(lA, (BLK_M * BLK_K)/size, MPI_FLOAT, dir_down, 10, dir_up, 10, cart_grid, &req_stat);
        MPI_Sendrecv_replace(lB, (BLK_K * BLK_N)/size, MPI_FLOAT, dir_left, 11, dir_right, 11, cart_grid, &req_stat);
    }
    double t_fin = MPI_Wtime();

    MPI_Gatherv(lC, (BLK_M * BLK_N)/size, MPI_FLOAT, gC, scnt, sdisp, blk_type, 0, MPI_COMM_WORLD);

    if(rank == 0) {
        printf("Cannonアルゴリズム実行時間: %.6f sec\n", t_fin - t_init);
        PrintMatrix(gC, BLK_M, BLK_N);
        free(gA); free(gB); free(gC);
    }
    free(lA); free(lB); free(lC);
    MPI_Type_free(&blk_type);
    MPI_Comm_free(&cart_grid);
    MPI_Finalize();
    return 0;
}

動作特性と最適化の考慮事項

非ブロック通信モデルを採用した場合、MPI操作のキューイングによりネットワークパイプラインが埋まり、計算コアのアイドリング時間が短縮される。特に行列階数が1024以上になると、L2/L3キャッシュの物理容量限界(通常2MB〜8MB帯)を超えるため、ブロッキングサイズをメモリ階層に合わせて再分割しない場合、キャッシュミスによるレイテンシ増大が発生し、実行時間が非線形に劣化する。

プロセス総数が増加すると通信パス長が拡大する傾向にあるが、2次元トポロジを使用するCannon系アルゴリズムはO(√P)の通信複雑度を持ち、1次元リング手法(O(P))と比較して高スケーラビリティを示す。しかし、階数512未満の小規模行列では通信オーバーヘッドが演算時間を凌駕するため、動的なアルゴリズム切り替えまたはタイルサイズ調節が必要となる。実装上はブロック境界の整数除算やメモリ整列(alignment)を検証することで、アセンブリレベルでのレジスタ割り当て効率を向上させることができる。

タグ: mpi gemm cannons-algorithm parallel-matrix-computation custom-mpi-types

5月15日 14:54 投稿