并行计算上机实验一 OpenMP实现π值的计算和PSRS排序
2019-05-02

题目

OpenMP实现π值的计算和PSRS排序

算法设计

π值的计算

基本的串行算法

![http://img.blog.csdn.net/20150

416160118625](lab1/clip_image002.jpg)

利用公式arctan(1)=π/4以及(arctan(x))’=1/(1+x^2),在[0, 1]上对f(x)= 4/(1+x^2)求积分,使用积分的定义离散化近似求解。将[0, 1]分成大量的小区间,使用for循环在每个小区间上计算y_i的值,最后求和。

使用并行域并行化的程序

在基本的串行算法上,用编译制导语句#pragma omp parallel将y_i的计算并行化(注意将使用到的变量私有化,每个线程各自拥有),显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个处理器分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡)。退出并行域之后,主线程对p个处理器各自的局部和sum进行求和。

使用共享任务结构并行化的程序

在基本的串行算法上,用编译制导语句#pragma omp parallel将y_i的计算并行化,但是并不显式地分配for循环,直接在for循环前面加上编译制导语句#pragma omp for,系统将for循环自动分配给p个处理器。退出并行域之后,主线程对p个处理器各自的局部和进行求和。

使用private子句和critical部分并行化的程序

在基本的串行算法上,用编译制导语句#pragma omp parallel将y_i的计算并行化(注意将使用到的变量私有化,每个线程各自拥有),显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个处理器分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡)。随后,直接在并行域内对上面的结果求和,但必须加上编译制导语句#pragma omp critical,表示指定代码段在同一时刻只能由一个线程进行执行。

使用并行规约的并行程序

在基本的串行算法上,用编译制导语句#pragma omp parallel for reduction(+:sum) 将y_i的计算并行化(注意将使用到的变量私有化,每个线程各自拥有),显式地分配for循环,将紧挨着的p个区间的y_i的值分配给p个处理器分别计算(而不是将全部区间直接划分给p个处理器,这样可能导致各线程负载不均衡)。sum存储了各个处理器的局部和,最后无需主进程显式地对局部和求和,系统将使用指定的算法(+),对各个局部的sum自动归约。

PSRS排序

给定n个元素的数组A[0..n - 1],执行以下步骤

(1)均匀划分:将A[0..n - 1]均匀划分成p段,每个处理器pi处理A[(i*n/p..(i+1)*n/p - 1]

(2)局部排序:每个处理器pi调用串行排序算法对A[(i*n/p..(i+1)*n/p - 1]排序

(3)选取样本:每个处理器pi从自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]中等间距选取p个样本元素

(4)样本排序:用一台处理器对p2个样本元素进行串行排序

(5)选择主元:用一台处理器从排好序的样本序列中选取p - 1个主元,并播送给其他pi

(6)主元划分:每个处理器pi按p - 1个主元将自己的有序子数组A[(i*n/p..(i+1)*n/p - 1]划分成p段,此时,新申请一个数组存放p段数据(每段长度可能不同)

(7)全局交换:每个处理器pi将自己的有序子数组中的每段按段号交换到对应id的处理器中。

(8)归并排序:每个处理器pi对接收到的元素进行归并排序

(9)将各处理器的新数据写回A,此时,原数组A已有序

算法分析

π值的计算

n为[0, 1]之间划分的区间数

对于串行算法,时间复杂度为O(n)

对于4种并行算法,时间复杂度为O(n/p)其中p为处理器数

PSRS排序

(参考文献 Hanmao Shi , Jonathan Schaeffer. Parallel Sorting by Regular Sampling.)

n为A的元素个数,p为处理器数,w = n/p

则时间复杂度为各阶段时间复杂度之和

, 近似为

另外,算法第六步每个处理器的w个数据根据p – 1个主元划分,每段的长度可能不相等,因此数组低维的长度不等,无法实现确定,但文献中证明了每段的长度最长不超过2w个元素。

Theorem 1: In phase 3 of PSRS, each processor merges less than 2w elements.

算法源代码

π值的计算

基本的串行算法
#include <stdio.h>
static long num_steps = 100000;//越大值越精确
double step;
void main(){
    int i;
    double x, pi, sum = 0.0;
    step = 1.0 / (double)num_steps;
    for(i = 1; i <= num_steps; i++){
        x = (i - 0.5) * step;
        sum = sum + 4.0/(1.0 + x * x);
    }
    pi = step * sum;
    printf("%lf\n", pi);
}
使用并行域并行化的程序
#include <stdio.h>
#include <omp.h>
#define NUM_THREADS 2
static long num_steps = 100000;
double step;
void main ()
{ 
    int i;
    double x, pi, sum[NUM_THREADS];
    step = 1.0 / (double)num_steps;
    omp_set_num_threads(NUM_THREADS);  //设置2线程
#pragma omp parallel private(i)  //并行域开始,每个线程(0和1)都会执行该代码
{
    double x;
    int id;
    id = omp_get_thread_num();
    for(i = id, sum[id] = 0.0; i < num_steps; i = i + NUM_THREADS){
        x = (i + 0.5) * step;
        sum[id] += 4.0 / (1.0 + x * x);
    }
}
    for(i = 0, pi = 0.0; i < NUM_THREADS; i++)
        pi += sum[i] * step;
    printf("%lf\n", pi);
}   
//共2个线程参加计算,其中线程0进行迭代步0,2,4,...线程1进行迭代步1,3,5,....
使用共享任务结构并行化的程序
#include <stdio.h>
#include <omp.h>
#define NUM_THREADS 2
static long num_steps = 100000;
double step;
void main ()
{ 
    int i;
    double x, pi, sum[NUM_THREADS];
    step = 1.0 / (double)num_steps;
    omp_set_num_threads(NUM_THREADS);  //设置2线程
#pragma omp parallel  //并行域开始,每个线程(0和1)都会执行该代码
{
    double x;
    int id;
    id = omp_get_thread_num();
    sum[id] = 0;
    #pragma omp for  //未指定chunk,迭代平均分配给各线程(0和1),连续划分
    for(i = 0; i < num_steps; i++){
        x = (i + 0.5) * step;
        sum[id] += 4.0 / (1.0 + x * x);
    }
}
    for(i = 0, pi = 0.0; i < NUM_THREADS; i++)
        pi += sum[i] * step;
    printf("%lf\n", pi);
}//共2个线程参加计算,其中线程0进行迭代步0~49999,线程1进行迭代步50000~99999.
使用private子句和critical部分并行化的程序
#include <stdio.h>
#include <omp.h>
#define NUM_THREADS 2
static long num_steps = 100000;
double step;
void main ()
{ 
    int i;
    double pi = 0.0, sum = 0.0, x = 0.0;
    step = 1.0 / (double)num_steps;
    omp_set_num_threads(NUM_THREADS);  //设置2线程
#pragma omp parallel private(x, sum, i) //该子句表示x,sum,i变量对于每个线程是私有的
{
    int id; //不用定义为私有,因为id是在并行域内定义的
    id = omp_get_thread_num();
    for(i = id, sum = 0.0; i < num_steps; i = i + NUM_THREADS){
        x = (i + 0.5) * step;
        sum += 4.0 / (1.0 + x * x);
    }
    #pragma omp critical  //指定代码段在同一时刻只能由一个线程进行执行
    pi += sum * step;
}
    printf("%lf\n", pi);
}
//共2个线程参加计算,其中线程0进行迭代步0,2,4,...线程1进行迭代步1,3,5,....
//当被指定为critical的代码段正在被0线程执行时,1线程的执行也到达该代码段,则它将被阻塞知道0线程退出临界区。
使用并行规约的并行程序
#include <stdio.h>
#include <omp.h>
#define NUM_THREADS 2
static long num_steps = 100000;
double step;
void main ()
{ 
    int i;
    double pi = 0.0, sum = 0.0, x = 0.0;
    step = 1.0 / (double)num_steps;
    omp_set_num_threads(NUM_THREADS);  //设置2线程
    #pragma omp parallel for reduction(+:sum) private(x) //每个线程保留一份私有拷贝sum,x为线程私有,最后对线程中所以sum进行+规约,并更新sum的全局值
    for(i = 1; i <= num_steps; i++){
        x = (i - 0.5) * step;
        sum += 4.0 / (1.0 + x * x);
    }
    pi = sum * step;
    printf("%lf\n",pi);
}   //共2个线程参加计算,其中线程0进行迭代步0~49999,线程1进行迭代步50000~99999.

PSRS排序

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<omp.h>

#define INF 2147483647
#define NUM_THREADS 3

/**************************************************
* 合并两个已排好序的子数组A[l : m], A[m + 1 : r], 写回A[l : r]
***************************************************/
void Merge(int *A, int l, int m, int r){
    int i, j, k, n1 = m - l + 1, n2 = r - m;
    int *L = (int *)malloc((n1 + 1) * sizeof(int));
    int *R = (int *)malloc((n2 + 1) * sizeof(int));
    for(i = 0; i < n1; i++) L[i] = A[l + i];
    for(j = 0; j < n2; j++) R[j] = A[m + 1 + j];
    L[i] = R[j] = INF;
    i = j = 0;
    for(k = l; k <= r; k++) if(L[i] <= R[j]) A[k] = L[i++]; else A[k] = R[j++];
    free(L); free(R);
} 

/**************************************************
* 对A[l : r]进行归并排序
***************************************************/
void MergeSort(int *A, int l, int r){
    if(l < r){
        int m = (l + r) / 2;
        MergeSort(A, l, m);
        MergeSort(A, m + 1, r);
        Merge(A, l, m, r);
    }
} 

/**************************************************
* 对A[0 : n - 1]进行PSRS排序
***************************************************/
void PSRS(int *A, int n){
    int id;
    int per = n / NUM_THREADS; //A的n个元素划分为NUM_THREADS段,每个处理器处理per个元素
    int sample[NUM_THREADS * NUM_THREADS]; //正则采样数为 NUM_THREADS * NUM_THREADS 
    int pivot[NUM_THREADS]; //NUM_THREADS - 1 个主元,最后一个设为INF,作为哨兵
    int *A2[NUM_THREADS][NUM_THREADS] = {NULL}; //数组,NUM_THREADS处理器把自己的per个元素划分为NUM_THREADS段
    int A2len[NUM_THREADS][NUM_THREADS] = {0}; //上面数组的实际存储长度,每个处理器的每个段的元素个数
    
    omp_set_num_threads(NUM_THREADS);
    #pragma omp parallel private(id) shared(per, A, A2, n, sample, pivot, A2len) //NUM_THREADS 个处理器同时执行,共用代码
    {
        id = omp_get_thread_num(); //当前处理器的id
        //(1)均匀划分 (2)每个处理器进行局部串行归并排序
        MergeSort(A, id * per, (id + 1) * per - 1); 
        
        //(2)正则采样,每个处理器分别选出 NUM_THREADS 个样本,共 NUM_THREADS * NUM_THREADS 个
        for(int k = 0; k < NUM_THREADS; k++) 
            sample[id * NUM_THREADS + k] = A[id * per + k * per / NUM_THREADS];
        #pragma omp barrier //设置路障,同步所有线程
        
        //(3)采样排序 (4)选择主元
        #pragma omp master //主线程执行,其他线程不执行
        {
            MergeSort(sample, 0, NUM_THREADS * NUM_THREADS - 1); //对采样的NUM_THREADS * NUM_THREADS个样本进行排序
            for(int k = 0; k < NUM_THREADS - 1; k++) //选出NUM_THREADS - 1个主元
                pivot[k] = sample[(k + 1) * NUM_THREADS];
            pivot[NUM_THREADS - 1] = INF; //哨兵
        }
        #pragma omp barrier
        
        //(5)主元划分
        for(int k = 0; k < NUM_THREADS; k++)
            A2[id][k] = (int *)malloc(sizeof(int) * per * 2); //每个处理器的每个段的数组申请空间,最长不会超过 2 * per 个
        for(int k = 0, j = id * per; j < id * per + per; j++){ //每个处理器对自己的per个元素按主元划分为NUM_THREADS段
            if(A[j] < pivot[k]){ //这里如果不用哨兵,最后一段要单独考虑
                A2[id][k][A2len[id][k]++] = A[j];
            }
            else{
                k++;
                A2[id][k][A2len[id][k]++] = A[j];
            }
        }
        #pragma omp barrier //这里必须有路障,否则部分处理器先划分结束后就开始全局交换,但此时数据还没有准备好

        //(6)全局交换
        //每个处理器把自己的per个元素划分成的NUM_THREADS个段“发送”给对应的NUM_THREADS个处理器(除了对应自己的那段)
        //实际上是每个处理器主动去其他处理器的per里取来自己的那段
        for(int k = 0; k < NUM_THREADS; k++){
            if(k != id){ //A2[id][id]是每个处理器对应自己的那段
                memcpy(A2[id][id] + A2len[id][id], A2[k][id], sizeof(int) * A2len[k][id]);//从其他处理器那里获取自己的那段直接接在后面
                A2len[id][id] += A2len[k][id];
            }
        }
        
        //(7)每个处理器归并排序
        MergeSort(A2[id][id], 0, A2len[id][id] - 1);
        
        #pragma omp barrier
        //(8)A2写回A
        #pragma omp master //主线程执行,其他线程不执行
        {
            for(int j = 0, k = 0; k < NUM_THREADS; k++){
                memcpy(A + j, A2[k][k], sizeof(int) * A2len[k][k]);
                j += A2len[k][k];
            }
        }
        #pragma omp barrier
        
        //销毁A2动态申请的空间
        for(int k = 0; k < NUM_THREADS; k++)
            free(A2[id][k]);
    }
}

int main(){
    int A[27] = {15, 46, 48, 93, 39, 6, 72, 91, 14, 36, 69, 40, 89, 61, 97, 12, 21, 54, 53, 97, 84, 58, 32, 27, 33, 72, 20};
    double t1, t2;
    t1 = omp_get_wtime();
    PSRS(A, 27);
    t2 = omp_get_wtime();
    printf("time: %lfs\n", t2 - t1);
    for(int i = 0; i < 27; i++)
        printf("%d ", A[i]);
    printf("\n");
    return 0;
}

结果

img

img

搜索
背景设置