并行计算上机实验二 MPI实现π值的计算和PSRS排序
2019-05-16

题目

MPI实现π值的计算和PSRS排序

算法设计

π值的计算

基本的串行算法

![http://img.blog.csdn.net/20150416

160118625](lab2/clip_image003.jpg)

利用公式arctan(1)=π/4以及(arctan(x))’=1/(1+x^2),在[0, 1]上对f(x)= 4/(1+x^2)求积分,使用积分的定义离散化近似求解。将[0, 1]分成大量的小区间,使用for循环在每个小区间上计算y_i的值,最后求和。

MPI并行算法

在基本的串行算法上,main函数中开始时用MPI_Init初始化,则所有进程都将执行接下来的代码。用MPI_Comm_size获取总进程数,用MPI_Comm_rank获取当前进程id。将y_i的计算并行化,显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个进程分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡),换言之,当前进程只计算每连续p个区间中的1个。最后用MPI_Reduce将每个进程计算的局部值归约求和,结果在主进程(0号进程)上。

PSRS排序

给定n个元素的数组A[0..n - 1],执行以下步骤

(1)均匀划分:将A[0..n - 1]均匀划分成p段,每个进程pi处理A[(i*n/p..(i+1)*n/p - 1]

(2)局部排序:每个进程pi调用串行排序算法对A[(i*n/p..(i+1)*n/p - 1]排序

(3)选取样本:每个进程pi从自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]中等间距选取p个样本元素,存放在samples数组中

(4)样本排序:用主进程global_samples数组收集(MPI_Gather)各个进程的samples数组,对总共的p2个样本元素进行串行排序

(5)选择主元:用主进程从排好序的p2个样本中选取p - 1个主元,并广播(MPI_Bcast)给其他pi

(6)主元划分:每个进程pi按p - 1个主元将自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]划分成p段(每段长度可能不同),此时,不需要新申请一个二维数组存放p段数据,只需要一个一维数组sizes记录每段的长度和一个一维数组记录每段的起始位置offsets(相对偏移,通过累加前面的段的长度得到)

(7)全局交换:每个进程pi将自己的有序子数组中的每段按段号交换到对应id的进程中,可以利用MPI_Alltoallv函数实现全局交换,之前的长度数组sizes和偏移数组作为参数offsets,另外需要三个数组newsizes、newoffsets和newdatas用于接收数据。

MPI_Alltoallv函数定义如下

int MPI_Alltoallv(const void *sendbuf,    发送缓冲区的起始地址
                  const int *sendcounts,  数组:每个发送数据的长度
                  const int *sdispls,  数组:每个发送数据相对于发送缓冲区起始地址的位移量
                  MPI_Datatype sendtype,  发送的数据类型
                  void *recvbuf,          接收缓冲区的起始地址
                  const int *recvcounts,  数组:每个接收数据的长度
                  const int *rdispls, 数组:每个接收数据相对于接收缓冲区起始地址的位移量
                  MPI_Datatype recvtype,  接收的数据类型
                  MPI_Comm comm)          通信域

该函数实现各个进程向各个进程交换不定长的数据

该函数执行效果举例如下,每个数据的长度不定

交换前
A0 A1 A2 A3 A4 A5    --进程0的sizes数组
B0 B1 B2 B3 B4 B5    --进程1的sizes数组
C0 C1 C2 C3 C4 C5    --进程2的sizes数组
D0 D1 D2 D3 D4 D5    --进程3的sizes数组
E0 E1 E2 E3 E4 E5    --进程4的sizes数组
F0 F1 F2 F3 F4 F5    --进程5的sizes数组
交换后
A0 B0 C0 D0 E0 F0    --进程0的newsizes数组
A1 B1 C1 D1 E1 F1    --进程1的newsizes数组
A2 B2 C2 D2 E2 F2    --进程2的newsizes数组
A3 B3 C3 D3 E3 F3    --进程3的newsizes数组
A4 B4 C4 D4 E4 F4    --进程4的newsizes数组
A5 B5 C5 D5 E5 F5    --进程5的newsizes数组

(8)归并排序:每个进程pi对接收到的元素进行归并排序

(9)用主进程收集(MPI_Gatherv)各个进程的新数据,写回A,此时,原数组A已有序

MPI_Gatherv函数定义如下

int MPI_Gatherv(void* sendbuf,            发送缓冲区的起始地址
                int sendcount,            发送数据的长度
                MPI_Datatype sendtype,    发送的数据类型
                void* recvbuf,            接收缓冲区的起始地址
                int *recvcounts,          数组:每个接收数据的长度
                int *displs,            数组:每个接收数据相对于接收缓冲区起始地址的位移量
                MPI_Datatype recvtype,    接收的数据类型
                int root,                 根进程的id
                MPI_Comm comm)            通信域

该函数实现各个进程向根进程汇集不定长的数据

算法分析

π值的计算

n为[0, 1]之间划分的区间数

对于串行算法,时间复杂度为O(n)

对于4种并行算法,时间复杂度为O(n/p)其中p为处理器数

PSRS排序

(参考文献 Hanmao Shi , Jonathan Schaeffer. Parallel Sorting by Regular Sampling.)

n为A的元素个数,p为处理器数,w = n/p

则时间复杂度为各阶段时间复杂度之和

, 近似为

另外,算法第六步每个处理器的w个数据根据p – 1个主元划分,每段的长度可能不相等,因此数组低维的长度不等,无法实现确定,但文献中证明了每段的长度最长不超过2w个元素。

Theorem 1: In phase 3 of PSRS, each processor merges less than 2w elements.

算法源代码

π值的计算

基本的串行算法
#include <stdio.h>
static long num_steps = 100000;//越大值越精确
double step;
void main(){
    int i;
    double x, pi, sum = 0.0;
    step = 1.0 / (double)num_steps;
    for(i = 1; i <= num_steps; i++){
        x = (i - 0.5) * step;
        sum = sum + 4.0/(1.0 + x * x);
    }
    pi = step * sum;
    printf("%lf\n", pi);
}
使用并行域并行化的程序
#include "mpi.h"
#include <stdio.h>
#include <math.h>
static long num_steps = 100000;
int main(int argc, char *argv[])
{
    int id, num_processes;
    double local_pi, pi, local_sum = 0.0, x;
    double step = 1.0 / (double)num_steps;
    double t1, t2;
    MPI_Init(&argc, &argv);  
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes); 
    MPI_Comm_rank(MPI_COMM_WORLD, &id);  
    if(id == 0) //主进程
        t1 = MPI_Wtime();
    for(int i = id + 1; i <= num_steps; i += num_processes){  //各线程计算自己部分的面积local_pi
        x = step * (i - 0.5);
        local_sum = local_sum + 4.0 / (1.0 + x * x);
    }
    local_pi = local_sum * step;
    MPI_Reduce(&local_pi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);  //归约local_pi到pi
    if(id == 0){
        t2 = MPI_Wtime();
        printf("time: %fs\n", t2 - t1);
        printf("PI is approximately %.16f\n", pi);
    }
    MPI_Finalize();
    return 0;
}

PSRS排序

#include <stdlib.h>
#include <stdio.h>
#include "mpi.h"

#define INF 2147483647

/**************************************************
* 合并两个已排好序的子数组A[l : m], A[m + 1 : r], 写回A[l : r]
***************************************************/
void Merge(int *A, int l, int m, int r){
    int i, j, k, n1 = m - l + 1, n2 = r - m;
    int *L = (int *)malloc((n1 + 1) * sizeof(int));
    int *R = (int *)malloc((n2 + 1) * sizeof(int));
    for(i = 0; i < n1; i++) L[i] = A[l + i];
    for(j = 0; j < n2; j++) R[j] = A[m + 1 + j];
    L[i] = R[j] = INF;
    i = j = 0;
    for(k = l; k <= r; k++) if(L[i] <= R[j]) A[k] = L[i++]; else A[k] = R[j++];
    free(L); free(R);
} 

/**************************************************
* 对A[l : r]进行归并排序
***************************************************/
void MergeSort(int *A, int l, int r){
    if(l < r){
        int m = (l + r) / 2;
        MergeSort(A, l, m);
        MergeSort(A, m + 1, r);
        Merge(A, l, m, r);
    }
} 

/**************************************************
* 对A[0 : n - 1]进行PSRS排序
***************************************************/
void PSRS(int *A, int n, int id, int num_processes){ //每个进程都会执行这个函数,这里面的变量每个进程都有一份,因此都是局部的(对于当前进程而言)
    int per; 
    int *samples, *global_samples; //global表示这个变量是主进程会使用的,但事实上每个进程都声明了
    int *pivots; 
    int *sizes, *newsizes;
    int *offsets, *newoffsets;
    int *newdatas;
    int newdatassize;
    int *global_sizes;
    int *global_offsets;
    //-------------------------------------------------------------------------------------------------------------------

    per = n / num_processes; //A的n个元素划分为num_processes段,每个进程处理per个元素
    samples = (int *)malloc(num_processes * sizeof(int)); //当前进程的采样数组
    pivots = (int *)malloc(num_processes * sizeof(int)); //num_processes - 1 个主元,最后一个设为INF,作为哨兵
    if(id == 0){ //主进程申请全局采样数组
        global_samples = (int *)malloc(num_processes * num_processes * sizeof(int)); //正则采样数为 num_processes * num_processes 
    }
    MPI_Barrier(MPI_COMM_WORLD);//设置路障,同步所有进程
    //-------------------------------------------------------------------------------------------------------------------
    //(1)均匀划分,当前进程对A中属于自己的部分进行串行归并排序
    MergeSort(A, id * per, (id + 1) * per - 1); //这里没有把A中对应当前进程的数据复制到当前进程,而是直接对A部分排序
    //(2)正则采样,当前进程选出 num_processes 个样本放在local_sample中
    for(int k = 0; k < num_processes; k++) 
        samples[k] = A[id * per + k * per / num_processes];
    //主进程的sample收集各进程的local_sample,共 num_processes * num_processes 个
    MPI_Gather(samples, num_processes, MPI_INT, global_samples, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //(3)采样排序 (4)选择主元
    if(id == 0){ //主进程
        MergeSort(global_samples, 0, num_processes * num_processes - 1); //对采样的num_processes * num_processes个样本进行排序
        for(int k = 0; k < num_processes - 1; k++) //选出num_processes - 1个主元
            pivots[k] = global_samples[(k + 1) * num_processes];
        pivots[num_processes - 1] = INF; //哨兵
    }
    //主进程向各个进程广播,所有进程拥有一样的pivots数组
    MPI_Bcast(pivots, num_processes, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    sizes = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的长度,用calloc分配后自动初始化为0
    offsets = (int *)calloc(num_processes, sizeof(int)); //当前进程的per个元素根据主元划分之后的每段的起始位置,用calloc分配后自动初始化为0
    newsizes = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的长度,用calloc分配后自动初始化为0
    newoffsets = (int *)calloc(num_processes, sizeof(int)); //当前进程在进行全局交换之后的每段的起始位置,用calloc分配后自动初始化为0
    //(5)主元划分
    for(int k = 0, j = id * per; j < id * per + per; j++){ //当前进程对自己的per个元素按主元划分为num_processes段,此处不处理数据,只计算每段大小
        if(A[j] < pivots[k]) //如果之前不用哨兵,最后一段要单独考虑
            sizes[k]++;
        else
            sizes[++k]++;
    }
    //(6)全局交换
    //多对多全局交换消息,首先每个进程向每个接收者发送接收者对应的【段的大小】
    MPI_Alltoall(sizes, 1, MPI_INT, newsizes, 1, MPI_INT, MPI_COMM_WORLD);
    //计算原来的段偏移数组,新的段偏移数组,新的数据大小
    newdatassize = newsizes[0];
    for(int k = 1; k < num_processes; k++){
        offsets[k] = offsets[k - 1] + sizes[k - 1];
        newoffsets[k] = newoffsets[k - 1] + newsizes[k - 1];
        newdatassize += newsizes[k];
    }
    //申请当前进程新的数据空间
    newdatas = (int *)malloc(newdatassize * sizeof(int)); //当前进程在进行全局交换之后的数据,是由交换后的各段组合而成的
    //多对多全局交换消息,每个进程向每个接收者发送接收者对应的【段】
    MPI_Alltoallv(&(A[id * per]), sizes, offsets, MPI_INT, newdatas, newsizes, newoffsets, MPI_INT, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //(7)当前进程归并排序自己的新数据
    MergeSort(newdatas, 0, newdatassize - 1);
    MPI_Barrier(MPI_COMM_WORLD);
    //(8)主进程收集各个进程的数据,写回A
    //首先收集各进程新数据的大小
    if(id == 0)
        global_sizes = (int *)calloc(num_processes, sizeof(int));
    MPI_Gather(&newdatassize, 1, MPI_INT, global_sizes, 1, MPI_INT, 0, MPI_COMM_WORLD);
    //主进程计算即将搜集的各进程数据的起始位置
    if(id == 0){ 
        global_offsets = (int *)calloc(num_processes, sizeof(int));
        for(int k = 1; k < num_processes; k++)
            global_offsets[k] = global_offsets[k - 1] + global_sizes[k - 1];
    }
    //主进程收集各个进程的数据
    MPI_Gatherv(newdatas, newdatassize, MPI_INT, A, global_sizes, global_offsets, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    //-------------------------------------------------------------------------------------------------------------------
    //销毁动态数组
    free(samples); samples = NULL;
    free(pivots); pivots = NULL;
    free(sizes); sizes = NULL;
    free(offsets); offsets = NULL;
    free(newdatas); newdatas = NULL;
    free(newsizes); newsizes = NULL;
    free(newoffsets); newoffsets = NULL;
    if(id == 0){ 
        free(global_samples); global_samples = NULL;
        free(global_sizes); global_sizes = NULL;
        free(global_offsets); global_offsets = NULL;
    }
}

int main(int argc, char *argv[]){
    int A[27] = {15, 46, 48, 93, 39, 6, 72, 91, 14, 36, 69, 40, 89, 61, 97, 12, 21, 54, 53, 97, 84, 58, 32, 27, 33, 72, 20};
    double t1, t2;
    int id, num_processes;
    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes); //获取进程数
    MPI_Comm_rank(MPI_COMM_WORLD, &id); //获取当前进程id
    if(id == 0)
        t1 = MPI_Wtime();
    PSRS(A, 27, id, num_processes);
    if(id == 0){
        t2 = MPI_Wtime();
        printf("time: %lfs\n", t2 - t1);
        for(int i = 0; i < 27; i++)
            printf("%d ", A[i]);
        printf("\n");
    }
    MPI_Finalize();
    return 0;
}

结果

img
img

搜索
背景设置