并行计算上机实验二 MPI实现π值的计算和PSRS排序

题目

MPI实现π值的计算和PSRS排序

算法设计

π值的计算

基本的串行算法

http://img.blog.csdn.net/20150416160118625

利用公式arctan(1)=π/4以及(arctan(x))’=1/(1+x^2),在[0, 1]上对f(x)= 4/(1+x^2)求积分,使用积分的定义离散化近似求解。将[0, 1]分成大量的小区间,使用for循环在每个小区间上计算y_i的值,最后求和。

MPI并行算法

在基本的串行算法上,main函数中开始时用MPI_Init初始化,则所有进程都将执行接下来的代码。用MPI_Comm_size获取总进程数,用MPI_Comm_rank获取当前进程id。将y_i的计算并行化,显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个进程分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡),换言之,当前进程只计算每连续p个区间中的1个。最后用MPI_Reduce将每个进程计算的局部值归约求和,结果在主进程(0号进程)上。

PSRS排序

给定n个元素的数组A[0..n - 1],执行以下步骤

(1)均匀划分:将A[0..n - 1]均匀划分成p段,每个进程pi处理A[(i*n/p..(i+1)*n/p - 1]

(2)局部排序:每个进程pi调用串行排序算法对A[(i*n/p..(i+1)*n/p - 1]排序

(3)选取样本:每个进程pi从自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]中等间距选取p个样本元素,存放在samples数组中

(4)样本排序:用主进程global_samples数组收集(MPI_Gather)各个进程的samples数组,对总共的p2个样本元素进行串行排序

(5)选择主元:用主进程从排好序的p2个样本中选取p - 1个主元,并广播(MPI_Bcast)给其他pi

(6)主元划分:每个进程pi按p - 1个主元将自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]划分成p段(每段长度可能不同),此时,不需要新申请一个二维数组存放p段数据,只需要一个一维数组sizes记录每段的长度和一个一维数组记录每段的起始位置offsets(相对偏移,通过累加前面的段的长度得到)

(7)全局交换:每个进程pi将自己的有序子数组中的每段按段号交换到对应id的进程中,可以利用MPI_Alltoallv函数实现全局交换,之前的长度数组sizes和偏移数组作为参数offsets,另外需要三个数组newsizes、newoffsets和newdatas用于接收数据。

MPI_Alltoallv函数定义如下

int MPI_Alltoallv(const void *sendbuf,    发送缓冲区的起始地址
                  const int *sendcounts,  数组:每个发送数据的长度
                  const int *sdispls,  数组:每个发送数据相对于发送缓冲区起始地址的位移量
                  MPI_Datatype sendtype,  发送的数据类型
                  void *recvbuf,          接收缓冲区的起始地址
                  const int *recvcounts,  数组:每个接收数据的长度
                  const int *rdispls, 数组:每个接收数据相对于接收缓冲区起始地址的位移量
                  MPI_Datatype recvtype,  接收的数据类型
                  MPI_Comm comm)          通信域

该函数实现各个进程向各个进程交换不定长的数据

该函数执行效果举例如下,每个数据的长度不定

交换前
A0 A1 A2 A3 A4 A5    --进程0的sizes数组
B0 B1 B2 B3 B4 B5    --进程1的sizes数组
C0 C1 C2 C3 C4 C5    --进程2的sizes数组
D0 D1 D2 D3 D4 D5    --进程3的sizes数组
E0 E1 E2 E3 E4 E5    --进程4的sizes数组
F0 F1 F2 F3 F4 F5    --进程5的sizes数组
交换后
A0 B0 C0 D0 E0 F0    --进程0的newsizes数组
A1 B1 C1 D1 E1 F1    --进程1的newsizes数组
A2 B2 C2 D2 E2 F2    --进程2的newsizes数组
A3 B3 C3 D3 E3 F3    --进程3的newsizes数组
A4 B4 C4 D4 E4 F4    --进程4的newsizes数组
A5 B5 C5 D5 E5 F5    --进程5的newsizes数组

(8)归并排序:每个进程pi对接收到的元素进行归并排序

(9)用主进程收集(MPI_Gatherv)各个进程的新数据,写回A,此时,原数组A已有序

MPI_Gatherv函数定义如下

int MPI_Gatherv(void* sendbuf,            发送缓冲区的起始地址
                int sendcount,            发送数据的长度
                MPI_Datatype sendtype,    发送的数据类型
                void* recvbuf,            接收缓冲区的起始地址
                int *recvcounts,          数组:每个接收数据的长度
                int *displs,            数组:每个接收数据相对于接收缓冲区起始地址的位移量
                MPI_Datatype recvtype,    接收的数据类型
                int root,                 根进程的id
                MPI_Comm comm)            通信域

该函数实现各个进程向根进程汇集不定长的数据

算法分析

π值的计算

n为[0, 1]之间划分的区间数

对于串行算法,时间复杂度为O(n)

对于4种并行算法,时间复杂度为O(n/p)其中p为处理器数

PSRS排序

(参考文献 Hanmao Shi , Jonathan Schaeffer. Parallel Sorting by Regular Sampling.)

n为A的元素个数,p为处理器数,w = n/p

则时间复杂度为各阶段时间复杂度之和

$O(wlogw+wlogp+plogw+p^2 logp^2)$

若$n >p^3 $, 近似为 $O(\frac{n}{p}logn)$

另外,算法第六步每个处理器的w个数据根据p – 1个主元划分,每段的长度可能不相等,因此数组低维的长度不等,无法实现确定,但文献中证明了每段的长度最长不超过2w个元素。

Theorem 1: In phase 3 of PSRS, each processor merges less than 2w elements.

算法源代码

π值的计算

基本的串行算法
#include <stdio.h>
static long num_steps = 100000;//越大值越精确
double step;
void main(){
    int i;
    double x, pi, sum = 0.0;
    step = 1.0 / (double)num_steps;
    for(i = 1; i <= num_steps; i++){
        x = (i - 0.5) * step;
        sum = sum + 4.0/(1.0 + x * x);
    }
    pi = step * sum;
    printf("%lf\n", pi);
}
使用并行域并行化的程序
#include "mpi.h"
#include <stdio.h>
#include <math.h>
static long num_steps = 100000;
int main(int argc, char *argv[])
{
    int id, num_processes;
    double local_pi, pi, local_sum = 0.0, x;
    double step = 1.0 / (double)num_steps;
    double t1, t2;
    MPI_Init(&argc, &argv);  
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes); 
    MPI_Comm_rank(MPI_COMM_WORLD, &id);  
    if(id == 0) //主进程
        t1 = MPI_Wtime();
    for(int i = id + 1; i <= num_steps; i += num_processes){  //各线程计算自己部分的面积local_pi
        x = step * (i - 0.5);
        local_sum = local_sum + 4.0 / (1.0 + x * x);
    }
    local_pi = local_sum * step;
    MPI_Reduce(&local_pi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);  //归约local_pi到pi
    if(id == 0){
        t2 = MPI_Wtime();
        printf("time: %fs\n", t2 - t1);
        printf("PI is approximately %.16f\n", pi);
    }
    MPI_Finalize();
    return 0;
}

#####

PSRS排序

#include <stdlib.h>
#include <stdio.h>
#include "mpi.h"

#define INF 2147483647

/**************************************************
* 合并两个已排好序的子数组A[l : m], A[m + 1 : r], 写回A[l : r]
***************************************************/
void Merge(int *A, int l, int m, int r){
    int i, j, k, n1 = m - l + 1, n2 = r - m;
    int *L = (int *)malloc((n1 + 1) * sizeof(int));
    int *R = (int *)malloc((n2 + 1) * sizeof(int));
    for(i = 0; i < n1; i++) L[i] = A[l + i];
    for(j = 0; j < n2; j++) R[j] = A[m + 1 + j];
    L[i] = R[j] = INF;
    i = j = 0;
    for(k = l; k <= r; k++) if(L[i] <= R[j]) A[k] = L[i++]; else A[k] = R[j++];
    free(L); free(R);
} 

/**************************************************
* 对A[l : r]进行归并排序
***************************************************/
void MergeSort(int *A, int l, int r){
    if(l < r){
        int m = (l + r) / 2;
        MergeSort(A, l, m);
        MergeSort(A, m + 1, r);
        Merge(A, l, m, r);
    }
} 

/**************************************************
* 对A[0 : n - 1]进行PSRS排序
***************************************************/
void PSRS(int *A, int n, int id, int num_processes){ //每个进程都会执行这个函数,这里面的变量每个进程都有一份,因此都是局部的(对于当前进程而言)
    int per; 
    int *samples, *global_samples; //global表示这个变量是主进程会使用的,但事实上每个进程都声明了
    int *pivots; 
    int *sizes, *newsizes;
    int *offsets, *newoffsets;
    int *newdatas;
    int newdatassize;
    int *global_sizes;
    int *global_offsets;
    //-------------------------------------------------------------------------------------------------------------------

    per = n / num_processes; //A的n个元素划分为num_processes段,每个进程处理per个元素
    samples = (int *)malloc(num_processes * sizeof(int)); //当前进程的采样数组
    pivots = (int *)malloc(num_processes * sizeof(int)); //num_processes - 1 个主元,最后一个设为INF,作为哨兵
    if(id == 0){ //主进程申请全局采样数组
        global_samples = (int *)malloc(num_processes * num_processes * sizeof(int)); //正则采样数为 num_processes * num_processes 
    }
    MPI_Barrier(MPI_COMM_WORLD);//设置路障,同步所有进程
    //-------------------------------------------------------------------------------------------------------------------
    //(1)均匀划分,当前进程对A中属于自己的部分进行串行归并排序
    MergeSort(A, id * per, (id + 1) * per - 1); //这里没有把A中对应当前进程的数据复制到当前进程,而是直接对A部分排序
    //(2)正则采样,当前进程选出 num_processes 个样本放在local_sample中
    for(int k = 0; k < num_processes; k++) 
        samples[k] = A[id * per + k * per / num_processes];
    //主进程的sample收集各进程的local_sample,共 num_processes * num_processes 个
    MPI_Gather(samples, num_processes, MPI_INT, global_samples, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //(3)采样排序 (4)选择主元
    if(id == 0){ //主进程
        MergeSort(global_samples, 0, num_processes * num_processes - 1); //对采样的num_processes * num_processes个样本进行排序
        for(int k = 0; k < num_processes - 1; k++) //选出num_processes - 1个主元
            pivots[k] = global_samples[(k + 1) * num_processes];
        pivots[num_processes - 1] = INF; //哨兵
    }
    //主进程向各个进程广播,所有进程拥有一样的pivots数组
    MPI_Bcast(pivots, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    sizes = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的长度,用calloc分配后自动初始化为0
    offsets = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的起始位置,用calloc分配后自动初始化为0
    newsizes = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的长度,用calloc分配后自动初始化为0
    newoffsets = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的起始位置,用calloc分配后自动初始化为0
    //(5)主元划分
    for(int k = 0, j = id * per; j < id * per + per; j++){ //当前进程对自己的per个元素按主元划分为num_processes段,此处不处理数据,只计算每段大小
        if(A[j] < pivots[k]) //如果之前不用哨兵,最后一段要单独考虑
            sizes[k]++;
        else
            sizes[++k]++;
    }
    //(6)全局交换
    //多对多全局交换消息,首先每个进程向每个接收者发送接收者对应的【段的大小】
    MPI_Alltoall(sizes, 1, MPI_INT, newsizes, 1, MPI_INT, MPI_COMM_WORLD);
    //计算原来的段偏移数组,新的段偏移数组,新的数据大小
    newdatassize = newsizes[0];
    for(int k = 1; k < num_processes; k++){
        offsets[k] = offsets[k - 1] + sizes[k - 1];
        newoffsets[k] = newoffsets[k - 1] + newsizes[k - 1];
        newdatassize += newsizes[k];
    }
    //申请当前进程新的数据空间
    newdatas = (int *)malloc(newdatassize * sizeof(int)); //当前进程在进行全局交换之后的数据,是由交换后的各段组合而成的
    //多对多全局交换消息,每个进程向每个接收者发送接收者对应的【段】
    MPI_Alltoallv(&(A[id * per]), sizes, offsets, MPI_INT, newdatas, newsizes, newoffsets, MPI_INT, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //(7)当前进程归并排序自己的新数据
    MergeSort(newdatas, 0, newdatassize - 1);
    MPI_Barrier(MPI_COMM_WORLD);
    //(8)主进程收集各个进程的数据,写回A
    //首先收集各进程新数据的大小
    if(id == 0)
        global_sizes = (int *)calloc(num_processes, sizeof(int));
    MPI_Gather(&newdatassize, 1, MPI_INT, global_sizes, 1, MPI_INT, 0, MPI_COMM_WORLD);
    //主进程计算即将搜集的各进程数据的起始位置
    if(id == 0){ 
        global_offsets = (int *)calloc(num_processes, sizeof(int));
        for(int k = 1; k < num_processes; k++)
            global_offsets[k] = global_offsets[k - 1] + global_sizes[k - 1];
    }
    //主进程收集各个进程的数据
    MPI_Gatherv(newdatas, newdatassize, MPI_INT, A, global_sizes, global_offsets, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //销毁动态数组
    free(samples); samples = NULL;
    free(pivots); pivots = NULL;
    free(sizes); sizes = NULL;
    free(offsets); offsets = NULL;
    free(newdatas); newdatas = NULL;
    free(newsizes); newsizes = NULL;
    free(newoffsets); newoffsets = NULL;
    if(id == 0){ 
        free(global_samples); global_samples = NULL;
        free(global_sizes); global_sizes = NULL;
        free(global_offsets); global_offsets = NULL;
    }
}

int main(int argc, char *argv[]){
    int A[27] = {15, 46, 48, 93, 39, 6, 72, 91, 14, 36, 69, 40, 89, 61, 97, 12, 21, 54, 53, 97, 84, 58, 32, 27, 33, 72, 20};
    double t1, t2;
    int id, num_processes;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes); //获取进程数
    MPI_Comm_rank(MPI_COMM_WORLD, &id); //获取当前进程id
    if(id == 0)
        t1 = MPI_Wtime();
    PSRS(A, 27, id, num_processes);
    if(id == 0){
        t2 = MPI_Wtime();
        printf("time: %lfs\n", t2 - t1);
        for(int i = 0; i < 27; i++)
            printf("%d ", A[i]);
        printf("\n");
    }
    MPI_Finalize();
    return 0;
}

结果

img
img

文章目录
  1. 题目
  2. 算法设计
    1. π值的计算
      1. 基本的串行算法
      2. MPI并行算法
    2. PSRS排序
  3. 算法分析
    1. π值的计算
    2. PSRS排序
  4. 算法源代码
    1. π值的计算
      1. 基本的串行算法
      2. 使用并行域并行化的程序
    2. PSRS排序
  5. 结果
|