Introduction HPC

Sequential programs(串行程序)

Parallel programs(并行程序)

OpenMP

OpenMP execution model

fork - join 并行模式

只有一个master thread ,程序开始(单线程)- fork - parallel - join

OpenMP memory model

All threads have access to the same **shared memory space **所有线程共享同意内存空间

OpenMP derectives

1
#pragma omp <directive> [clause[[,] clause] ... ]
  • #pragma omp :告诉编译器这是一个 OpenMP 指令。
  • :具体的并行控制指令。
  • clause :附加选项(子句),比如 private、reduction、schedule。

指令

parallel

表是这段代码被多个线程并行执行

1
2
3
4
#pragma omp parallel
{
printf("Hello world \n");
}

for

用于for循环之前,将循环分配到多个线程执行,必须保证每次循环之间无相关性!

1
2
3
4
5
6
7
#pragma omp parallel   // 创建一个线程组
{
#pragma omp for // 把 for 循环的迭代分配给这些线程
for (int i = 0; i < N; i++) {
A[i] = i * 2;
}
}

parallel for

等价于 parallel + for

1
2
3
4
#pragma omp parallel for
for (int i = 0; i < N; i++) {
A[i] = i * 2;
}

sections

把不同的代码段(section)分配给不同的线程去执行,每个 section 只会执行一次。

相当于 任务并行(task parallelism),而不是 数据并行(data parallelism)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel
{
#pragma omp sections
{
#pragma omp section
{
printf("Thread %d doing task A\n", omp_get_thread_num());
}

#pragma omp section
{
printf("Thread %d doing task B\n", omp_get_thread_num());
}

#pragma omp section
{
printf("Thread %d doing task C\n", omp_get_thread_num());
}
}
}
return 0;
}

parallel sections

single

在一个并行区域里,只允许一个线程执行这段代码,而其他线程 跳过它,继续往下执行

1
2
3
4
5
6
7
#pragma omp parallel
{
#pragma omp single
{
printf("Only one thread executes this!\n");
}
}

barrier

让所有线程在这里集合(同步点),只有当 所有线程都到达这个位置 时,才会一起继续往下执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel
{
int tid = omp_get_thread_num();

printf("Thread %d: part A done\n", tid);

#pragma omp barrier // 所有线程必须等到这里

printf("Thread %d: part B done\n", tid);
}
return 0;
}

Thread 0: part A done
Thread 2: part A done
Thread 1: part A done
Thread 3: part A done
Thread 0: part B done
Thread 1: part B done
Thread 2: part B done
Thread 3: part B done
  • part A 部分的输出是乱序的(线程独立执行)。
  • 但是所有线程必须 等到 barrier,再一起进入 part B
  • 所以 part B 的内容永远不会早于任何一个 part A

nowait

取消隐式的 barrier,也就是不让线程在循环或指令后等待其他线程完成,而是直接继续执行后续代码。

1
2
3
4
5
#pragma omp for
for (int i = 0; i < N; i++) {
// do something
}
// 隐式 barrier:所有线程必须等到这里,才继续执行下面的代码
1
2
3
4
5
#pragma omp for nowait
for (int i = 0; i < N; i++) {
// do something
}
// 没有隐式 barrier,线程可以立即继续执行下面的代码
  • 当循环后面的代码不依赖于当前循环的完成结果时,可以加 nowait 提高并行效率。
  • 如果后续代码依赖当前循环的结果,不能用 nowait,否则会出现数据竞争或错误。

if

条件性地决定是否在并行区域创建线程。也就是说,可以根据一个布尔条件来选择执行并行版本还是串行版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <stdio.h>
#include <omp.h>

int main() {
int N = 5;

// 如果 N > 10 则使用并行计算, 否则使用串行计算
#pragma omp parallel if(N > 10)
{
// doing something
}

return 0;
}

atomic

保证某个内存操作是原子的(不可分割的),防止多个线程同时修改同一个变量时产生 竞争条件 (race condition)

但只能管一行语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <stdio.h>
#include <omp.h>

int main() {
int sum = 0;

#pragma omp parallel for
for (int i = 0; i < 1000; i++) {
#pragma omp atomic
sum += 1; // 原子操作,避免竞态条件
}

printf("sum = %d\n", sum);
return 0;
}

critical

作用等同于atomic,用于一段代码,保证一段代码 同一时刻只允许一个线程进入

1
2
3
4
5
6
7
8
9
int sum = 0;

#pragma omp parallel for
for (int i = 0; i < 10; i++) {
#pragma omp critical
{
sum += i; // 每个线程都会来执行,但一次只能进一个
}
}
  • single VS critical :
    • single = “只做一次,随便哪个线程做就行”
    • critical = “大家都要做,但得一个一个排队做,不能一起做”

master

指定一段代码由主线程(thread 0)执行,其余线程跳过,不等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel
{
int tid = omp_get_thread_num();

#pragma omp master
{
printf("This is printed only by the master thread (tid = %d)\n", tid);
}

printf("Thread %d reached here.\n", tid);
}
return 0;
}

只有主线程会执行里面的printf, 其他线程都执行外面的printf

orderd

用来保证在并行循环中某些操作按循环迭代的顺序执行,即使循环是并行执行的。

  • 当你用 #pragma omp for 并行循环时,每个迭代可能被不同线程同时执行。
  • 如果某些操作必须严格按照迭代顺序执行(比如输出、累加到共享数组),就可以使用 ordered。
  • 必须在 for 循环中加上 ordered 子句,然后在循环体内用 #pragma omp ordered 标记需要按顺序执行的部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel for ordered
for (int i = 0; i < 8; i++) {
// 可以并行计算 i*i
int val = i * i;

// 但是下面的打印顺序必须按 i 从 0 到 7
// 输出顺序一定是 i=0, i=1, i=2, …,即使循环是并行执行的。
#pragma omp ordered
printf("i=%d, val=%d\n", i, val);
}
return 0;
}

子句

shared

指定变量在多个线程间共享。

1
2
3
4
5
int sum = 0;
#pragma omp parallel for shared(sum)
for(int i = 0; i < 10; i++) {
sum += i; // 注意:可能出现竞争条件,需要 atomic 或 critical
}

private / firstprivate / lastprivate

  • private(var):每个线程有独立的 var 副本,原来的共享变量不会被线程修改。
  • firstprivate(var):每个线程有独立副本,并初始化为原来的值。
  • lastprivate(var):循环结束后,最后一次迭代(代码理论上的最后一次)的线程的 var 值写回原来的共享变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
int x = 10;

// private
#pragma omp parallel for private(x)
for(int i = 0; i < 4; i++) {
x = i; // 每个线程修改自己的 x,不影响其他线程
}

// firstprivate
#pragma omp parallel for firstprivate(x)
for(int i = 0; i < 4; i++) {
printf("%d\n", x); // 每个线程初始 x = 10
}
1
2
3
4
5
6
7
8
9
10
11
12
13
// lastprivate
int main() {
int x = 0;

#pragma omp parallel for lastprivate(x)
for (int i = 0; i < 10; i++) {
x = i; // 每个线程有自己的 x
printf("Thread %d: x = %d\n", omp_get_thread_num(), x);
}

printf("After loop, x = %d\n", x); // x = 9
return 0;
}

threadprivate

用于声明每个线程都有自己独立的全局或静态变量副本,每个线程访问的是自己独立的副本,互不干扰。

  • 默认情况下,全局变量是所有线程共享的
  • 如果你希望每个线程有自己的独立版本,可以用 threadprivate。
  • 和局部变量不同,局部变量天然就是线程私有的;threadprivate 针对的是全局或 static 变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <omp.h>
#include <stdio.h>

int counter = 0; // 全局变量
#pragma omp threadprivate(counter)

int main() {
#pragma omp parallel num_threads(4)
{
counter = omp_get_thread_num(); // 每个线程写自己的值
printf("Thread %d: counter = %d\n", omp_get_thread_num(), counter);
}
return 0;
}

Thread 1: counter = 1
Thread 2: counter = 2
Thread 0: counter = 0
Thread 3: counter = 3

default

  • 设置并行区域中变量的默认属性:
    • default(none):所有变量必须显式声明 shared 或 private,安全
    • default(shared):未声明的变量默认共享
    • default(private):未声明的变量默认私有
1
2
3
4
5
6
int x, y;
#pragma omp parallel default(none) shared(x) private(y)
{
y = omp_get_thread_num();
x = y;
}

reduction

  • 对共享变量进行归约操作(累加、乘积、最大值、最小值等),保证结果正确。
1
2
3
4
5
6
7
int sum = 0;
#pragma omp parallel for reduction(+:sum)
for(int i = 0; i < 100; i++) {
sum += i; // 自动在各线程归约
}

printf("sum = %d\n",sum);

copyin

用来给 线程私有的全局或静态变量(用 threadprivate 声明) 初始化值。它把 主线程(thread 0)的值复制到所有线程的私有副本

换句话说,threadprivate 声明的变量每个线程都有自己的副本,而 copyin 可以用来统一初始化这些副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <stdio.h>
#include <omp.h>

int counter = 0;
#pragma omp threadprivate(counter)

int main() {
counter = 42; // 主线程初始化

#pragma omp parallel copyin(counter)
{
printf("Thread %d: counter = %d\n", omp_get_thread_num(), counter);
counter += omp_get_thread_num(); // 每个线程修改自己的副本
}

printf("Main thread counter = %d\n", counter); // 主线程的值不变
return 0;
}

Thread 0: counter = 42
Thread 1: counter = 42
Thread 2: counter = 42
Thread 3: counter = 42
Main thread counter = 42
  • counter 是 threadprivate,每个线程有独立副本
  • copyin(counter) 把主线程的值(42)复制给每个线程的副本
  • 每个线程修改自己的 counter 不会影响其他线程
  • 主线程的 counter 保持原值

copyprivate

  • copyprivate 是 OpenMP 的 single 子句的一部分。
  • 作用:在 parallel 区域中,single 区块由一个线程执行,但这个线程的变量值可以复制给其他线程
  • 通常配合 #pragma omp single 使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <stdio.h>
#include <omp.h>

int main() {
int x;

#pragma omp parallel
{
#pragma omp single copyprivate(x)
{
x = 42; // 只有一个线程执行
printf("Single thread sets x = %d\n", x);
}

// 其他线程在这里就能使用刚刚初始化的 x
printf("Thread %d sees x = %d\n", omp_get_thread_num(), x);
}

return 0;
}

// 1. #pragma omp single:parallel 区域中只有一个线程执行单独的代码块。
// 2. copyprivate(x):single 线程初始化的 x 值复制给所有线程的私有副本。
// 3. 其他线程即使没有执行 single 内的代码,也能得到相同的 x 值。

collapse

collapse(n) 用来 把 n 层嵌套循环合并为一个大的迭代空间,再分配给线程执行。

1
2
3
4
5
6
7
8
// 没有collapse(n)
// 只有外层循环 i 是被 OpenMP 分配给线程并行执行的,内层循环 j 在每个线程中是顺序执行的
#pragma omp parallel for
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
printf("Thread %d: i=%d, j=%d\n", omp_get_thread_num(), i, j);
}
}
1
2
3
4
5
6
7
8
// 有collapse
// 这时 OpenMP 会把外层和内层循环合并成 16 个迭代,然后分配给线程
#pragma omp parallel for collapse(2)
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
printf("Thread %d: i=%d, j=%d\n", omp_get_thread_num(), i, j);
}
}
  • collapse = 把多层循环“摊平”成一层迭代空间
  • 适合嵌套循环层次少,迭代次数不够多的情况
  • 可以与 schedule 结合,提高负载均衡

任务调度

schedule

  • schedule 是 OpenMP 的 for 或 parallel for 子句的一部分,用来控制 循环迭代分配给线程的策略
  • 主要作用:决定线程如何分配迭代任务,以及每次分配多少个迭代
  • type:static, dynamic, guided, runtime(真正意义上不算)
  • chunck_size:块大小
1
schedule(type[,chunk_size])
类型 说明 示例
static 循环迭代 均匀分块 分配给线程。默认 chunk_size = N / num_threads #pragma omp for schedule(static)
static, chunk_size 每个线程一次分配 chunk_size 个迭代,按顺序循环分配 #pragma omp for schedule(static, 2)
dynamic 循环迭代 动态分配,线程完成后再拿新的迭代块。适合负载不均的循环 #pragma omp for schedule(dynamic, 3)
guided 每次分配迭代块大小 逐渐减小,前期大块,后期小块。如果不指定chunk size,最小可以到1 #pragma omp for schedule(guided, 2)
auto 让编译器/运行时自动选择调度策略 #pragma omp for schedule(auto)
runtime 运行时根据环境变量 OMP_SCHEDULE 决定 #pragma omp for schedule(runtime)
  • static:负载均匀,开销小
  • dynamic:负载不均,线程可以“抢活”,开销略大
  • guided:负载递减型动态分配
  • auto / runtime:交给编译器或环境变量控制

task

  • Task 是 OpenMP 的一个并行单位,表示一段可以延迟执行的代码块。
  • 可以想象成一个 “工作单元”,由线程池中的线程去执行。
  • 与 parallel for 不同,它不一定是循环的迭代,也可以是任意代码块。
  • Task 可以有 依赖关系(depend),保证任务之间的执行顺序。

特点:

  1. 延迟执行:创建 task 时,OpenMP 不会立即执行它,而是放入任务队列等待线程执行。
  2. 灵活性高:可以表示任意代码,不只是循环。
  3. 线程复用:多个 task 可以被线程池中的空闲线程执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel
{
#pragma omp single // 保证只有一个线程创建任务
{
#pragma omp task
printf("Task 1 executed by thread %d\n", omp_get_thread_num());

#pragma omp task
printf("Task 2 executed by thread %d\n", omp_get_thread_num());
}
}
return 0;
}
  • single 保证任务只被创建一次。
  • 任务可以被任意空闲线程执行。
depend

在计算前缀和、Fibonacci 或矩阵块操作中,任务可能需要依赖其他任务完成的结果。

1
2
3
4
#pragma omp task depend(in: a) depend(out: b)
{
// task code
}
  • in: 表示依赖输入,任务必须等待这些数据准备好。
  • out: 表示任务会写这个数据。
  • inout: 表示任务既读又写。
1
2
3
4
5
6
7
8
9
10
// 二维前缀和
#pragma omp parallel
#pragma omp single
for (int x = 0; x < N; x++) {
for (int y = 0; y < N; y++) {
#pragma omp task depend(in: B[x-1][y], B[x][y-1], B[x-1][y-1]) depend(out: B[x][y])
B[x][y] = B[x-1][y] + B[x][y-1] - B[x-1][y-1] + A[x][y];
}
}
// B[x][y] 需要依赖 B[x-1][y], B[x][y-1], B[x-1][y-1] 的值
taskwait

用于 等待当前线程生成的所有子任务完成,然后再继续往下执行。

  • 它只等待 同一线程创建的任务
  • 不会等待其他线程创建的任务(除非它们是依赖任务)。
  • 放在并行区域或单线程区域中。
  • 后面不跟任何语句,只是一个同步点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <omp.h>

int main() {
#pragma omp parallel
#pragma omp single
{
#pragma omp task
printf("Task 1\n");

#pragma omp task
printf("Task 2\n");

// 等待上面所有任务完成
#pragma omp taskwait

printf("All tasks finished!\n");
}
return 0;
}

// 输出示意(顺序可能不同,但 All tasks finished! 一定在前两个任务后执行):
Task 1
Task 2
All tasks finished!

环境变量与 API 对照表

功能类别 环境变量 对应 API(代码里设置) 说明
线程数 OMP_NUM_THREADS omp_set_num_threads(int n) omp_get_num_threads() 控制并行区域的线程数
动态线程调整 OMP_DYNAMIC (TRUE/FALSE) omp_set_dynamic(int flag) omp_get_dynamic() 是否允许运行时调整线程数
线程总上限 OMP_THREAD_LIMIT omp_get_thread_limit() 限制整个程序最多线程数
嵌套并行 OMP_NESTED(旧标准) OMP_MAX_ACTIVE_LEVELS omp_set_max_active_levels(int levels) omp_get_max_active_levels() 设置并行嵌套层数
循环调度 OMP_SCHEDULE omp_set_schedule(omp_sched_t kind, int chunk) omp_get_schedule(&kind, &chunk) 控制 for 循环调度策略
绑定和亲和性 OMP_PROC_BIND omp_get_proc_bind() 控制线程是否绑定到 CPU
OMP_PLACES 无(环境变量指定) 线程可绑定的位置(cores/threads/sockets)
任务 OMP_MAX_TASK_PRIORITY 控制任务最大优先级
调试 & 性能 OMP_DISPLAY_ENV (TRUE/FALSE) 是否显示 OpenMP 环境设置
OMP_WAIT_POLICY (ACTIVE/PASSIVE) 控制空闲线程等待策略
OMP_CANCELLATION (TRUE/FALSE) omp_get_cancellation() 是否允许取消任务/循环
1
2
3
4
export OMP_NUM_THREADS=8
export OMP_SCHEDULE="dynamic,2"
export OMP_PROC_BIND=spread
export OMP_DISPLAY_ENV=TRUE

OpenMP 常用函数

1.线程管理

函数 说明
omp_set_num_threads(int n) 设置并行区域中的线程数(仅对后续并行区有效)
omp_get_num_threads() 获取当前并行区域的线程数
omp_get_thread_num() 获取当前线程编号(0 ~ n-1)
omp_get_max_threads() 返回可能使用的最大线程数
omp_set_dynamic(int flag) 开启/关闭动态调整线程数
omp_get_dynamic() 判断是否允许动态调整线程数
omp_get_thread_limit() 获取程序允许的最大线程数

2.并行控制

函数 说明
omp_in_parallel() 判断当前是否在并行区域内
omp_set_nested(int flag) (旧) 设置是否支持嵌套并行(OMP 4.0 前)
omp_set_max_active_levels(int levels) 设置最大并行嵌套层数
omp_get_max_active_levels() 获取最大嵌套层数
omp_get_level() 获取当前并行嵌套层数
omp_get_ancestor_thread_num(int level) 获取某层的祖先线程编号

3.调度与任务

函数 说明
omp_set_schedule(omp_sched_t kind, int chunk) 设置循环调度策略(static, dynamic, guided, auto)
omp_get_schedule(omp_sched_t *kind, int *chunk) 获取当前调度策略和 chunk 大小
omp_get_wtime() 获取墙钟时间(高精度计时)
omp_get_wtick() 获取 omp_get_wtime() 的精度
omp_get_num_procs() 获取可用的处理器数
omp_get_proc_bind() 获取线程绑定策略

4.任务与取消

函数 说明
omp_get_cancellation() 是否支持取消任务
omp_get_default_device() 获取默认设备(比如 GPU 设备 ID)
omp_set_default_device(int device_num) 设置默认设备

5.锁(同步机制)

函数 说明
omp_init_lock(omp_lock_t *lock) 初始化锁
omp_destroy_lock(omp_lock_t *lock) 销毁锁
omp_set_lock(omp_lock_t *lock) 获取锁(阻塞等待)
omp_unset_lock(omp_lock_t *lock) 释放锁
omp_test_lock(omp_lock_t *lock) 尝试获取锁(不阻塞,立即返回)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <stdio.h>
#include <omp.h>

int main() {
printf("最大线程数: %d\n", omp_get_max_threads());
printf("处理器数: %d\n", omp_get_num_procs());

double t1 = omp_get_wtime();

#pragma omp parallel
{
int id = omp_get_thread_num();
printf("线程 %d / %d 正在运行\n", id, omp_get_num_threads());
}

double t2 = omp_get_wtime();
printf("耗时: %f 秒\n", t2 - t1);

return 0;
}

OpenMP 编程常见坑点总结

1.循环迭代变量没声明为 private

写了平行区域 + for 循环,但是忘了加 private(i) → 会发生数据竞争。

1
2
3
4
5
6
#pragma omp parallel   // 创建多个线程
{
for (int i = 0; i < 10; i++) {
printf("Thread %d: i=%d\n", omp_get_thread_num(), i);
}
}
  • 这里 i 是 共享的(shared),所有线程同时用 同一个变量 i
  • 当多个线程同时修改 i,就会发生 数据竞争 (race condition)
  • English: The loop variable i is shared across threads, so multiple threads update it at the same time, causing a race condition.

解决办法,加上private(i) 或者使用parallel for(会自动把 i 设置成private)

1
2
3
4
5
6
7
8
9
10
11
#pragma omp parallel private(i)
{
for (int i = 0; i < 10; i++) {
printf("Thread %d: i=%d\n", omp_get_thread_num(), i);
}
}

#pragma omp parallel for
for (int i = 0; i < 10; i++) {
printf("Thread %d: i=%d\n", omp_get_thread_num(), i);
}

2.数据竞争(race condition)

  • 多个线程同时写共享变量 → 结果不确定。
  • 常见于计数、累加。
  • English: Shared variables modified by multiple threads without synchronization will lead to unpredictable results.
1
2
3
4
5
6
7
8
int sum = 0;
#pragma omp parallel for
for (int i = 0; i < 1000; i++) {
sum++; // ❌ 多线程同时写 sum → 结果随机
}
// ✅ 用 atomic 或 reduction
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < 1000; i++) sum++;

3.reduction

忘记 reduce
  • 很多人写并行求和、最值时直接 shared → 结果错误。
  • English: Without reduction, all threads write to the same shared variable at the same time, causing wrong results.
1
2
3
4
5
6
7
int sum = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) sum += A[i]; // ❌ 错

// ✅ 需要加 reduction()
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; i++) sum += A[i];
reduction 忘记初始化
  • reduction 会在每个线程里生成 sum 的副本,但如果 sum 没初始化,结果是随机的。
  • English: If the reduction variable is not initialized, each thread’s private copy starts with an undefined value, leading to incorrect results.
1
2
3
int sum;
#pragma omp parallel for reduction(+:sum)
for (int i=0; i<10; i++) sum += i; // ❌ sum 未初始化
1
2
3
4
// ✅正确写法
int sum = 0;
#pragma omp parallel for reduction(+:sum)
for (int i=0; i<10; i++) sum += i;

4.barrier / nowait 使用不当

  • 默认 for 和 sections 结束时会有隐式 barrier。
  • 如果用 nowait,要小心后续代码是否需要同步。
  • English: Using nowait skips synchronization, so some threads may read data before others finish writing.
1
2
3
4
5
6
7
8
#pragma omp parallel
{
#pragma omp for nowait
for (int i=0;i<10;i++) A[i]=i;

// ❌ 这里直接用 A[i],可能有线程还没写完
printf("Thread %d: A[5] = %d\n", omp_get_thread_num(), A[5]);
}
  • 所以有些线程可能还在写 A[i],但别的线程已经跑到 printf 了。
  • 结果就可能访问到 未初始化或部分更新的 A,产生 竞态条件 (race condition)

去掉 nowait(默认会有 barrier,同步所有线程):

1
2
3
4
5
6
7
8
#pragma omp parallel
{
#pragma omp for // 默认带 barrier
for (int i=0;i<10;i++) A[i]=i;

// ✔️ 到这里时,所有 A[i] 都保证写完
printf("Thread %d: A[5] = %d\n", omp_get_thread_num(), A[5]);
}

保留 nowait,但手动加 barrier

1
2
3
4
5
6
7
8
9
#pragma omp parallel
{
#pragma omp for nowait
for (int i=0;i<10;i++) A[i]=i;

#pragma omp barrier // ✔️ 等所有线程完成写操作

printf("Thread %d: A[5] = %d\n", omp_get_thread_num(), A[5]);
}

5.task 忘记加 taskwait

  • 如果 task 有依赖关系,必须手动同步。
  • English: Without taskwait, the program may continue before all tasks have finished, leading to incomplete results.
1
2
3
4
5
6
7
8
9
10
11
12
#pragma omp parallel
{
#pragma omp single
{
#pragma omp task
do_A();
#pragma omp task
do_B();
// ❌ 如果这里直接结束,B 可能还没完成
#pragma omp taskwait // ✅ 等待所有 task 完成
}
}

6.循环嵌套没有 collapse

  • 多重循环并行时,只 parallel 外层 → 内层仍是串行。
  • English: Without collapse, only the outer loop is parallelized, so the workload may not be fully distributed.
1
2
3
4
5
6
7
8
9
#pragma omp parallel for
for (int i=0;i<N;i++)
for (int j=0;j<N;j++)
work(i,j); // ❌ 内层没并行
// ✅
#pragma omp parallel for collapse(2)
for (int i=0;i<N;i++)
for (int j=0;j<N;j++)
work(i,j);

7. schedule 不当

  • schedule(static) 适合工作量均匀的循环;
  • schedule(dynamic, chunk) 适合负载不均的循环;
  • 乱用可能导致线程负载不平衡。
  • English: Using the wrong scheduling strategy can cause load imbalance, where some threads finish early and others are overloaded.

8.乱用 critical / atomic

  • critical 会严重拖慢性能,如果只是单行更新 → 用 atomic。

  • English: critical forces threads to serialize access, which is slower; if only a single statement is updated, atomic is better.

1
2
3
4
5
6
7
8
#pragma omp parallel for
for (int i=0;i<N;i++) {
#pragma omp critical // ❌ 太重
sum += A[i];
}
// ✅ 更快:
#pragma omp atomic
sum += A[i];

9.输出打印错乱

  • printf 是共享 I/O,多个线程同时写会交错。
  • English: Standard output is shared; multiple threads writing at the same time will interleave their messages.
  • 如果要有序输出,用 ordered 或加锁。
1
2
3
4
5
#pragma omp parallel for ordered
for (int i=0;i<8;i++) {
#pragma omp ordered
printf("i=%d\n", i);
}

10.忘记设置默认共享/私有变量(default 子句)

  • x 是共享变量(shared),但被多个线程同时修改

  • x is a shared variable, but it is being modified simultaneously by multiple threads.

1
2
3
4
5
6
int x = 10, y = 20;
#pragma omp parallel
{
x += omp_get_thread_num(); // ❌ 数据竞争
printf("Thread %d: x=%d\n", omp_get_thread_num(), x);
}
  • ✅解决办法,明确写出
1
#pragma omp parallel default(none) private(x) shared(y)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 每个线程有自己的 x 副本:
int x = 10, y = 20;
#pragma omp parallel private(x)
{
x = 10; // 每个线程初始化自己的副本
x += omp_get_thread_num();
printf("Thread %d: x=%d\n", omp_get_thread_num(), x);
}

// x 是共享的,但所有线程的更新结果能累加:
int x = 10;
#pragma omp parallel reduction(+:x)
{
x += omp_get_thread_num();
}
printf("Final x = %d\n", x);

11. PPT

  1. #pragma omp parallel for num_threads(P)
      for (int i = 0; i < N; i++) {
    #pragma omp task
        h(i);
      }
    
    1
    2
    3
    4
    5
    6
    7

    - 在 OpenMP 中,如果没有明确的同步机制(如 `taskwait` 或 `barrier`),父任务(在这里是循环迭代)可以在子任务(h(i))完成前就结束。
    - 当所有循环迭代完成后,parallel 区域会立即结束。此时,可能有许多任务(h(i))尚未执行完成。
    - Explicit synchronization mechanisms (such as taskwait or barrier) cause the parallel region to end immediately after all loop iterations are completed. At this point, many tasks (h(i)) may not have finished executing.

    **在 single 区域中创建任务并等待**:

    #pragma omp parallel num_threads(P) { #pragma omp single { for (int i = 0; i < N; i++) { #pragma omp task h(i); } #pragma omp taskwait // 等待所有任务完成 } }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55




    ## AVX2

    SIMD(Single Instruction, Multiple Data)单指令多数据

    #### 指令分类体系

    AVX2 指令可以按功能和数据类型分为以下几大类:

    1. **数据移动指令**
    - 加载:从内存到寄存器
    - 存储:从寄存器到内存
    - 设置:直接设置寄存器值
    - 混洗:在寄存器内部重新排列数据
    2. **算术运算指令**
    - 基本算术:加、减、乘
    - 融合乘加:FMA 操作
    - 特殊算术:绝对值、最大值、最小值
    3. **逻辑运算指令**
    - 按位与、或、异或、非
    4. **比较指令**
    - 等于、大于、小于等比较操作
    5. **移位指令**
    - 逻辑移位、算术移位
    6. **类型转换指令**
    - 不同数据类型间的转换

    **Intel Intrinsics Guide**:https://www.intel.com/content/www/us/en/docs/intrinsics-guide/index.html



    | **片段** | **英文缩写来源** | **含义** | **例子** |
    | --------------------------------- | ---------------- | -------------------------------- | ------------------------- |
    | _mm | “multimedia” | 表示 SSE/AVX 指令 | _mm_add_ps, _mm256_mul_pd |
    | 128 / 256 / 512 | 位宽 | 使用的寄存器宽度 | __m128, __m256, __m512 |
    | add, mul, sub, div, and, or, xor… | 运算名 | 算术或逻辑操作 | _mm256_add_ps(加法) |
    | ps | Packed Single | 8 个 float(单精度) | |
    | pd | Packed Double | 4 个 double(双精度) | |
    | epi8 / epi16 / epi32 / epi64 | Packed Integer | 8 位 / 16 位 / 32 位 / 64 位整数 | |

    ```c
    // 以 _mm256_add_epi32 为例:
    __m256i _mm256_add_epi32 (__m256i a, __m256i b)
    // 返回类型:__m256i
    // 函数名:_mm256_ + 操作(add) + 数据类型(epi32)
    // 参数:两个 __m256i 向量具体操作

    _mm256_add_epi32
    | | | └── 每个元素是 32 位整数
    | | └── 操作:加法
    | └── 使用 256 位寄存器
    └── SIMD 内建函数(多媒体扩展)
符号 英文全称 含义 示例
s single 单精度浮点数(float, 32-bit) _mm256_add_ps
d double 双精度浮点数(double, 64-bit) _mm256_add_pd
i integer 整数类型 _mm256_add_epi32
p packed “打包”操作(向量运算,多个元素同时操作) _mm256_add_ps(8个float)
s (第二个) scalar 标量操作,只作用于第一个元素 _mm_add_ss(仅第一个float)
u unaligned 不对齐内存访问 _mm256_loadu_ps
l low 操作低半部分(低位元素) _mm_unpacklo_ps
h high 操作高半部分(高位元素) _mm_unpackhi_ps
r reversed 按相反顺序排列元素 _mm256_permute2f128_ps(..., ..., 1)

具体指令

1.1 加载内存(load、loadu)
  • 内存中的数据读取到CPU 的 SIMD 寄存器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <immintrin.h>
#include <stdio.h>

void basic_load_demo() {
// 1. 准备对齐的内存(32字节对齐)
int32_t aligned_data[8] __attribute__((aligned(32))) = {1, 2, 3, 4, 5, 6, 7, 8};

// 2. 对齐加载 - 最快!(但是碰到未对齐的肯能会出错)
__m256i vec1 = _mm256_load_si256((__m256i*)aligned_data);

// 3. 未对齐加载 - 更通用
int32_t unaligned_data[8] = {10, 20, 30, 40, 50, 60, 70, 80};
__m256i vec2 = _mm256_loadu_si256((__m256i*)unaligned_data);

printf("数据加载完成!\n");
}
对齐是什么?
  • 内存是怎么存数据的?

我们知道内存的地址是一个一个字节排列的,比如:

1
0x1000, 0x1001, 0x1002, 0x1003, 0x1004, ...

每个地址存放一个字节(byte)。

假设我们有一个 int(4字节),它可能存放在:

1
地址 0x1000 ~ 0x1003
  • 对齐的核心概念

    对齐(alignment) 就是让数据的起始地址是它大小的整数倍。
    假设一行内存是 64 字节(两个 32 字节块):

1
2
3
|<-------- 32 字节对齐区域 1 -------->|<-------- 32 字节对齐区域 2 -------->|
0x1000 0x1020 0x1040
|-----------数据块A------------------|-----------数据块B------------------|

如果数据正好从 0x1000 开始,就是对齐的 ✅

如果数据从 0x1004 开始,就会跨越两个 32 字节块

  • 举例 :AVX 256位寄存器(32 字节)对齐 = 起始地址 % 32 == 0

因为 AVX2 的寄存器一次处理 256 位 = 32 字节,

所以我们希望数据的地址是 32 的倍数

地址 是否 32 字节对齐
0x1000 ✅ 对齐(1000 mod 32 = 0)
0x1010 ❌ 不对齐(1010 mod 32 = 16)
0x1020 ✅ 对齐(1020 mod 32 = 0)

CPU 取数据时,就得访问两次内存(性能下降,甚至报错)。

  • C语言中的内存对齐实现

你可以通过两种方式保证数组对齐:

方式 1:使用 GCC/Clang 属性

1
2
3
int x __attribute__((aligned(32)));
float y[8] __attribute__((aligned(32)));
int32_t aligned_data[8] __attribute__((aligned(32))) = {1,2,3,4,5,6,7,8};

这行代码告诉编译器:

​ “请让 aligned_data 的起始地址是 32 的倍数。”

你可以验证:

1
2
3
4
5
6
7
8
#include <stdio.h>
#include <stdint.h>

int main() {
int32_t aligned_data[8] __attribute__((aligned(32))) = {1,2,3,4,5,6,7,8};
printf("地址: %p\n", (void*)aligned_data);
printf("是否32字节对齐: %s\n", ((uintptr_t)aligned_data % 32 == 0) ? "是" : "否");
}

输出示例:

1
2
地址: 0x7ffeefbff480
是否32字节对齐: 是
  • 方式 2:使用 _mm_malloc
1
type* x = (type*) _mm_malloc(size * sizeof(type), 32);
1
2
3
4
5
6
7
8
#include <immintrin.h>
#include <stdlib.h>

int main() {
int *data = (int*)_mm_malloc(8 * sizeof(int), 32); // 第二个参数=对齐要求
printf("地址: %p\n", (void*)data);
_mm_free(data);
}

这样分配的内存一定是 32 字节对齐的。

1.2 创建向量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void set_instructions_demo() {
// 方法1:设置所有元素为同一个值
__m256i all_ones = _mm256_set1_epi32(1); // 所有8个int32都是1
__m256i all_fives = _mm256_set1_epi8(5); // 所有32个int8都是5

// 方法2:逐个元素设置(顺序从右到左!)
__m256i custom_vec = _mm256_set_epi32(7, 6, 5, 4, 3, 2, 1, 0);
// 注意:索引 7 6 5 4 | 3 2 1 0
// 高位 ←--------→ 低位

// 方法3:设置零向量
__m256i zero_vec = _mm256_setzero_si256();

printf("向量创建完成!\n");
}
1.3 储存指令(store)
  • CPU 寄存器中的数据 写回到 内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void store_instructions_demo() {
// 创建一个测试向量
__m256i src_vec = _mm256_set_epi32(100, 200, 300, 400, 500, 600, 700, 800);

// 准备目标内存
int32_t aligned_dst[8] __attribute__((aligned(32)));
int32_t unaligned_dst[8];

// 对齐存储
_mm256_store_si256((__m256i*)aligned_dst, src_vec);

// 未对齐存储
_mm256_storeu_si256((__m256i*)unaligned_dst, src_vec);

// 验证结果
printf("存储结果:");
for(int i = 0; i < 8; i++) {
printf("%d ", aligned_dst[i]);
}
printf("\n");
}
2.1 基本算术运算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void arithmetic_demo() {
int32_t a[8] = {10, 20, 30, 40, 50, 60, 70, 80};
int32_t b[8] = {1, 2, 3, 4, 5, 6, 7, 8};
int32_t result[8];

__m256i vec_a = _mm256_loadu_si256((__m256i*)a);
__m256i vec_b = _mm256_loadu_si256((__m256i*)b);

// 加法 - 同时进行8个32位整数的加法
__m256i vec_add = _mm256_add_epi32(vec_a, vec_b);

// 减法
__m256i vec_sub = _mm256_sub_epi32(vec_a, vec_b);

// 乘法(注意:这是低半部分乘法)
__m256i vec_mul = _mm256_mullo_epi32(vec_a, vec_b);

// 存储并显示加法结果
_mm256_storeu_si256((__m256i*)result, vec_add);
printf("加法结果:");
for(int i = 0; i < 8; i++) {
printf("%d ", result[i]);
}
printf("\n");
}
  • 数组求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int32_t sum_array_avx2(int32_t* array, size_t size) { //size得是8的倍数
// 初始化累加器为0
__m256i sum_vec = _mm256_setzero_si256();

// 每次处理8个元素(因为 8 × 4 字节 = 32 字节 = 一个 AVX 寄存器宽度)
for(size_t i = 0; i < size; i += 8) {
__m256i data = _mm256_loadu_si256((__m256i*)&array[i]);
sum_vec = _mm256_add_epi32(sum_vec, data);
}

// 将向量中的8个值水平求和为1个值
int32_t result[8];
_mm256_storeu_si256((__m256i*)result, sum_vec);

int32_t final_sum = 0;
for(int i = 0; i < 8; i++) {
final_sum += result[i];
}

return final_sum;
}
2.2 逻辑运算

命名规则总结

结构 含义
mm256 使用 AVX 256-bit 宽寄存器
cmp compare 比较
eq / gt / lt 操作类型(相等、大于、小于)
epi32 / ps / pd 数据类型(int32 / float / double)
浮点数比较函数族

(_mm256_cmp_ps / _mm256_cmp_pd)

1
2
__m256  _mm256_cmp_ps (__m256  A, __m256  B, const int op);   // 单精度 float
__m256d _mm256_cmp_pd (__m256d A, __m256d B, const int op); // 双精度 double
  • 参数 A, B :要比较的两个向量。
  • 参数 op :指定比较操作(见下表)。
  • 返回值:一个掩码向量,每个元素要么是 0xFFFFFFFF(真),要么是 0x00000000(假)。

✅ 常用比较操作(op取值)

常量名 含义 说明
_CMP_EQ_OQ A == B 相等
_CMP_LT_OS A < B 小于
_CMP_LE_OS A ≤ B 小于等于
_CMP_GT_OS A > B 大于
_CMP_GE_OS A ≥ B 大于等于
_CMP_NEQ_UQ A != B 不相等
_CMP_ORD_Q 两者都不是 NaN 有序比较
_CMP_UNORD_Q 任一为 NaN 无序比较
  • OS 表示 Ordered Signaling(有序比较,遇到 NaN 会抛异常)
  • OQ 表示 Ordered Quiet(遇到 NaN 不会抛异常)

Exemple:

1
2
3
__m256 A = _mm256_set_ps(8,7,6,5,4,3,2,1);
__m256 B = _mm256_set_ps(1,2,3,4,5,6,7,8);
__m256 mask = _mm256_cmp_ps(A, B, _CMP_GT_OS);

结果:

1
mask = [FFFFFFFF, FFFFFFFF, FFFFFFFF, FFFFFFFF, 00000000, 00000000, 00000000, 00000000]

代表前四个元素 A > B 为真,后面为假。

整数比较函数族

(_mm256_cmp* epi*)

整数没有第三个参数 op,而是根据函数名来区分操作类型。

✅ 常见函数(只有== 和 > 比较)

函数名 数据类型 含义
_mm256_cmpeq_epi8 每个元素 8 位 相等比较
_mm256_cmpeq_epi16 每个元素 16 位 相等比较
_mm256_cmpeq_epi32 每个元素 32 位 相等比较
_mm256_cmpeq_epi64 每个元素 64 位 相等比较
_mm256_cmpgt_epi8 每个元素 8 位 大于比较
_mm256_cmpgt_epi16 每个元素 16 位 大于比较
_mm256_cmpgt_epi32 每个元素 32 位 大于比较
_mm256_cmpgt_epi64 每个元素 64 位 大于比较

Exemple:

1
2
3
4
5
__m256i a = _mm256_set_epi32(1, 2, 3, 4, 5, 6, 7, 8);
__m256i b = _mm256_set_epi32(1, 9, 3, 9, 5, 9, 7, 9);

__m256i eq_mask = _mm256_cmpeq_epi32(a, b);
__m256i gt_mask = _mm256_cmpgt_epi32(a, b);

输出(掩码):

1
eq_mask = [FFFFFFFF, 00000000, FFFFFFFF, 00000000, FFFFFFFF, 00000000, FFFFFFFF, 00000000]

即 a 与 b 在索引 0、2、4、6 相等。

  • 比较两个数组是否相同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <immintrin.h>  // AVX/AVX2 指令集
#include <cstdint> // int32_t、uint8_t 等类型
#include <cstdio> // printf

void comparison_demo() {
int32_t a[8] = {1, 2, 3, 4, 5, 6, 7, 8};
int32_t b[8] = {1, 9, 3, 9, 5, 9, 7, 9};
uint8_t mask[32]; // 比较结果掩码

__m256i vec_a = _mm256_loadu_si256((__m256i*)a);
__m256i vec_b = _mm256_loadu_si256((__m256i*)b);

// 比较是否相等(逐元素比较)
// 相等 → 对应32位元素全为1(二进制全1 = 0xFFFFFFFF)
// 不相等 → 对应元素全为0
// 比较是否相等,返回掩码(相等的位置为0xFF,不相等为0x00)
__m256i cmp_result = _mm256_cmpeq_epi32(vec_a, vec_b);

_mm256_storeu_si256((__m256i*)mask, cmp_result);

printf("比较结果(0xFF表示相等):\n");
for(int i = 0; i < 8; i++) {
printf("a[%d]=%d, b[%d]=%d, mask=%d\n",
i, a[i], i, b[i], mask[i*4]); // 注意:每个int32对应4个字节
}
}
2.3 置换/重排 permute

permute 置换函数
在 SIMD(比如 AVX)中,重新排列寄存器中的元素顺序 “打乱 / 调整 向量里的元素顺序”
在 SIMD 中,很多算法(比如矩阵运算、排序、卷积、混洗)需要把向量的不同部分重排。

固定索引(imm8)
  • 函数:_mm256_permute4x64_pd / _mm256_permute4x64_epi64
  • 元素数:4(每个 64-bit double 或 int64)
  • 控制方式:编译期常数 imm8(8 位),每两位控制一个输出元素取自 A 的哪个位置
  • 用法示例:
1
2
__m256d B = _mm256_permute4x64_pd(A, 0b01_00_11_10);
// 输出顺序:B[0]=A[2], B[1]=A[3], B[2]=A[0], B[3]=A[1]
  • 优点:速度快,编译器可以优化
  • 缺点:只能在编译期知道重排模式
1
2
3
4
5
// 0b01_00_11_10 ,0b表示二进制
B[0] = A[ imm8[1:0] ]
B[1] = A[ imm8[3:2] ]
B[2] = A[ imm8[5:4] ]
B[3] = A[ imm8[7:6] ]
1
2
3
4
imm8[7:6] = 01 -> B[3] = A[1]
imm8[5:4] = 00 -> B[2] = A[0]
imm8[3:2] = 11 -> B[1] = A[3]
imm8[1:0] = 10 -> B[0] = A[2]

注意顺序是从低位 [1:0] 对应 B[0],高位 [7:6] 对应 B[3]。

可变索引(idx)
  • 函数:_mm256_permutevar8x32_ps / _mm256_permutevar8x32_epi32
  • 元素数:8(每个 32-bit float 或 int32)
  • 控制方式:寄存器 idx,每个元素告诉输出该位置取 A 的哪个元素
  • 用法示例:
1
2
__m256i idx = _mm256_set_epi32(7,6,5,4,3,2,1,0);
__m256 B = _mm256_permutevar8x32_ps(A, idx); // 完全反转
  • 优点:运行期动态决定重排
  • 缺点:比固定索引略慢

Exemple:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 假设有一个寄存器 A *注意 _mm256_set_ps 第一个参数是最高位 A7,最后一个是 A0。
__m256 A = _mm256_set_ps(10,20,30,40,50,60,70,80);
// A = [A7, A6, A5, A4, A3, A2, A1, A0]
// 10, 20, 30, 40, 50, 60, 70, 80

// 完全反转
// 创建一个索引寄存器 idx
__m256i idx = _mm256_set_epi32(7,6,5,4,3,2,1,0);
__m256 B = _mm256_permutevar8x32_ps(A, idx);
// B[0] = A[idx[0]] = A[7] = 10
// B[1] = A[idx[1]] = A[6] = 20
// …
// B[7] = A[idx[7]] = A[0] = 80
2.4 Set

mm256_set*

这些函数用来 初始化 AVX 寄存器(__m256 或 __m256i),手动指定每个元素的值。

函数 寄存器类型 元素类型 元素个数 功能
_mm256_set_ps(f0,…,f7) __m256 float32 8 设置 8 个 float 元素(f7 → 高位 A7,f0 → 低位 A0)
_mm256_set_pd(d0,…,d3) __m256d double64 4 设置 4 个 double 元素(d3 → 高位 A3,d0 → 低位 A0)
_mm256_set_epi64x(i0,…,i3) __m256i int64 4 设置 4 个 int64 元素(i3 → 高位 A3,i0 → 低位 A0)
_mm256_set_epi32(i0,…,i7) __m256i int32 8 设置 8 个 int32 元素
_mm256_set_epi16(i0,…,i15) __m256i int16 16 设置 16 个 short 元素
_mm256_set_epi8(i0,…,i31) __m256i int8 32 设置 32 个 char 元素
  • 注意顺序:第一个参数对应 最高位元素,最后一个对应 最低位元素

mm256_set1*

  • 功能:把同一个值填充到寄存器所有元素
1
2
3
__m256 v = _mm256_set1_ps(3.14f);   // 所有 8 个 float 都是 3.14
__m256d vd = _mm256_set1_pd(2.718); // 所有 4 个 double 都是 2.718
__m256i vi = _mm256_set1_epi32(7); // 所有 8 个 int32 都是 7
2.5 其他指令
1
2
3
4
5
6
7
8
9
10
// 基本 FMA 操作(乘加)
// float
__m256 _mm256_fmadd_ps(__m256 a, __m256 b, __m256 c); // a * b + c
__m256 _mm256_fmsub_ps(__m256 a, __m256 b, __m256 c); // a * b - c
__m256 _mm256_fnmadd_ps(__m256 a, __m256 b, __m256 c); // -(a * b) + c
__m256 _mm256_fnmsub_ps(__m256 a, __m256 b, __m256 c); // -(a * b) - c
// double
__m256d _mm256_fmadd_pd(__m256d a, __m256d b, __m256d c); // a * b + c
__m256d _mm256_fmsub_pd(__m256d a, __m256d b, __m256d c); // a * b - c
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// hadd 水平加法
_mm256_hadd_pd(A, B) // ( Horizontal-Add-Packed-Double )
// Entree: [ A0, A1, A2, A3 ], [ B0, B1, B2, B3 ]
// Sortie: [ A0 + A1, B0 + B1, A2 + A3, B2 + B3 ]
_mm256_hadd_ps(A, B) // ( Horizontal-Add-Packed-Single )
// A = [A0, A1, A2, A3, A4, A5, A6, A7], B = [B0, B1, B2, B3, B4, B5, B6, B7]
// Sortie: [A0+A1, A2+A3, B0+B1, B2+B3, A4+A5, A6+A7, B4+B5, B6+B7]
_mm256_hadd_epi32(A,B)
// A = [A0, A1, A2, A3, A4, A5, A6, A7]
// B = [B0, B1, B2, B3, B4, B5, B6, B7]
// C = [A0+A1, A2+A3, B0+B1, B2+B3, A4+A5, A6+A7, B4+B5, B6+B7]
_mm256_hadd_epi16(A,B)
// A = [A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15]
// B = [B0, B1, B2, B3, B4, B5, B6, B7, B8, B9, B10, B11, B12, B13, B14, B15]
// C = [A0+A1, A2+A3, B0+B1, B2+B3, A4+A5, A6+A7, B4+B5, B6+B7, A8+A9, A10+A11, B8+B9, B10+B11, A12+A13, A14+A15, B12+B13, B14+B15]
3 常见错误

📋 SIMD/AVX 错误类型总结表(双语版)

错误类型 典型表现 修正方法 英语错误描述
内存对齐Memory Alignment 使用 _load_ps 但数据未对齐 使用 _loadu_ps 或对齐数据 Unaligned memory access - _mm256_load_ps requires 32-byte aligned memory, but the data is not properly aligned
**数据类型Data Type __m256 用于 double,_ps 用于 double 使用正确的类型和函数后缀 Data type mismatch - Using __m256 (float) for double precision data, should use __m256d
边界处理Boundary Handling 循环步长导致数组越界 添加剩余元素处理逻辑 Array out-of-bounds access - Loop stride causes reading beyond array boundaries, missing tail processing
**类型转换Type Conversion 错误的指针强制转换 使用正确的 intrinsic 函数 Incorrect pointer casting - Dangerous pointer casting instead of using proper intrinsic functions
**编译选项Compilation Flags 编译时缺少 -mavx2 添加编译标志 Missing compilation flag - AVX/AVX2 intrinsics require -mavx2 or -mavx compilation flag
**函数后缀Function Suffix _ps 用于整数操作,_epi32 用于浮点数 使用正确的操作后缀 Incorrect function suffix - Using _ps (packed single) for integer operations, should use _epi32
**寄存器使用Register Usage 混合使用不同宽度的寄存器 保持寄存器宽度一致 Register width inconsistency - Mixing different register widths (e.g., __m128 with __m256)
题目5 未初始化使用 Uninitialized variable - using SIMD register without proper initialization

LAB

LAB1

Ex1 HelloWorld

截屏2025-10-07 10.13.20
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <cstdio>
#include <iostream>
#include "omp.h"

/** The order of priority for determining the number of threads is
* 1) num_threads() clause
* 2) omp_set_num_threads() function
* 3) OMP_NUM_THREADS environment variable
*/

int main()
{
// (c) 三种修改线程数的方法
// OMP_NUM_THREADS=3 ./hello-openmp // Lowest priority 使用环境变量
omp_set_num_threads(5); // Medium priority
#pragma omp parallel num_threads(7) // High priority
{
// (a) 打印每个thread的id和总线程数
printf("Thread %d/%d\n", omp_get_thread_num(), omp_get_num_threads());

// (b) 使用 single 和 master 结构,观察不同
#pragma omp single // Only one (arbitrary) thread executes
//#pragma omp master // Only thread 0 executes
printf("Hello from thread %d\n", omp_get_thread_num());

}
return 0;
}

Ex2 计算数组和

截屏2025-10-07 10.32.58

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <chrono>
#include <iostream>
#include <vector>
#include "omp.h"

int main()
{
int i;
int N = 10000000;
std::vector<double> A(N);
double sum = 0.0;

auto start = std::chrono::high_resolution_clock::now();

//====================================TODO=====================================
// (a) 初始化数组
#pragma omp parallel default(none) shared(A, sum, i, N)
{
#pragma omp for
for (i = 0; i < N; i++) {
A[i] = (double)i;
} // Implicit barrier 注意omp for 后有隐藏barrier

// (b,c)
// Sum with omp for 使用omp for reduction 计算数组sum
#pragma omp for reduction(+:sum)
for (i = 0; i < N; i++) {
sum += A[i];
}

// (d) 使用四个 section 计算 sum,每一部分计算 N/4
// Sum with four sections / Somme avec quatre sections
#pragma omp sections
{
#pragma omp section
{
double localSum = 0;
for (int i = 0; i < N / 4; i++) { localSum += A[i]; }
// We can put critical in all sections as well but atomic is faster for a simple add
//#pragma omp critical
#pragma omp atomic
sum += localSum;
}
#pragma omp section
{
double localSum = 0;
for (int i = N / 4; i < (2 * N) / 4; i++) { localSum += A[i]; }

#pragma omp atomic
sum += localSum;
}
#pragma omp section
{
double localSum = 0;
for (int i = (2 * N) / 4; i < (3 * N) / 4; i++) { localSum += A[i]; }

#pragma omp atomic
sum += localSum;
}
#pragma omp section
{
double localSum = 0;
for (int i = (3 * N) / 4; i < N; i++) { localSum += A[i]; }

#pragma omp atomic
sum += localSum;
}
}
}

//=================================================================================
std::cout << "Sum is " << sum << std::endl;
std::chrono::duration<double> time = std::chrono::high_resolution_clock::now() - start;
std::cout << "Execution time: " << time.count() << "s\n";

return 0;
}

Ex3 归并排序

截屏2025-10-07 11.03.12

截屏2025-10-07 11.04.05

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <omp.h>
#include <assert.h>
#include <algorithm>

#define SWAP(a,b) {int tmp = a; a = b; b = tmp;}
#define SIZE 1024

void verify(int* a, int size);
void merge(int* a, int size, int* temp);
void mergesort(int* a, int size, int* temp);

int main(int argc, char** argv) {
int* a;
int* temp;
double inicio, fim;
int size = SIZE;
int i;

//setup
if (argc>=2) size = atoi(argv[1]);
printf("Sorting an array of size %d.\n", size);
a = (int*)calloc(size, sizeof(int));
temp = (int*)calloc(size, sizeof(int));
srand(26);
for (i = 0; i<size; ++i) {
a[i] = rand();
}
//====================================TODO==================================
// sort
// mergesort(a, size, temp); temp 临时缓冲区
#pragma omp parallel default(none) num_threads(4) shared(a, size, temp)
{
#pragma omp sections
{
#pragma omp section
{
std::sort(&a[0], &a[size / 4]);
}
#pragma omp section
{
std::sort(&a[size / 4], &a[2 * size / 4]);
}
#pragma omp section
{
std::sort(&a[2 * size / 4], &a[3 * size / 4]);
}
#pragma omp section
{
std::sort(&a[3 * size / 4], &a[size]);
}
} // barriere implicite

#pragma omp sections
{
#pragma omp section
{
merge(a, size / 2, temp);
}
#pragma omp section
{
merge(a + size / 2, size / 2, temp + size / 2);
}
} // barriere implicite
} // fin region parallele

merge(temp, size, a);

verify(a, size);
}
//===========================================================
void verify(int* a, int size) {
... 验证是否正确排序
}

void merge(int* a, int size, int* temp) {
... 合并两个已经排序的连续子区间
}

void mergesort(int* a, int size, int* temp) {
...
}

Ex4 计算π截屏2025-10-07 11.32.49

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <chrono>
#include <iostream>
#include "omp.h"

inline double f(double x)
{
return (4 / (1 + x * x));
}

int main()
{
int i;
const int N = 100000000;
double pi = 0.0;
double s = 1.0 / (double)N;
//============================TODO===============================
// Calculer le pi en sequentiel
auto start = std::chrono::high_resolution_clock::now();
for (i = 0; i < N; i++) {
pi = pi + s * (f(i * s) + f((i + 1) * s)) / 2;
}
//===========================================================
std::cout << "pi = " << pi << std::endl;
std::chrono::duration<double> tempsOmpFor = std::chrono::high_resolution_clock::now() - start;
std::cout << "Temps parallel omp for: " << tempsOmpFor.count() << "s\n";
//==============================TODO=============================
// Calculer le pi avec omp for et reduction
pi = 0.0;
start = std::chrono::high_resolution_clock::now();
#pragma omp parallel default(none) shared(pi, s)
{
#pragma omp for reduction(+:pi)
for (int i = 0; i < N; i++) {
pi = pi + s * (f(i * s) + f((i + 1) * s)) / 2;
}
}
//==============================================================
std::cout << "pi = " << pi << std::endl;
tempsOmpFor = std::chrono::high_resolution_clock::now() - start;
std::cout << "Temps parallel omp for: " << tempsOmpFor.count() << "s\n";

//=============================TODO==============================
// Calculer le pi avec boucle parallele faite a la main
pi = 0.0;
start = std::chrono::high_resolution_clock::now();
#pragma omp parallel default(none) shared(pi, s)
{
int thId = omp_get_thread_num(); // 当前线程编号0~numTh-1
int numTh = omp_get_num_threads(); // 线程总数
int elemParTh = N / numTh; // 每个线程负责的元素数量(肯能有余数)
int thDebut = thId * elemParTh; // 当前线程负责区间的起始索引
int thFin; // 结束索引,因为肯能有剩余,暂未定
if (thId == numTh - 1) { thFin = N; } // 让最后一个线程兜底
else { thFin = thDebut + elemParTh; }
double piLocal = 0.0; //创建一个局部累加器储存 pi,防止对shared变量频繁写入
for (int i = thDebut; i < thFin; i++) {
piLocal = piLocal + s * (f(i * s) + f((i + 1) * s)) / 2;
}
#pragma omp atomic
pi += piLocal; //把最后结果安全的加到 pi 上
}
// ==============================================================
std::cout << "pi = " << pi << std::endl;
std::chrono::duration<double> tempsForMain = std::chrono::high_resolution_clock::now() - start;
std::cout << "Temps parallel for a la main: " << tempsForMain.count() << "s\n";

return 0;
}

LAB2

Ex1 哥德巴赫猜想

截屏2025-10-07 12.24.45

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <iostream>
#include <chrono>
#include <cmath>
#include "omp.h"

#define N 8192

/**
* Calculer si x est un nombre premier ou pas
*/
bool estPremier(int x)
{
if (x < 2) { return false; }
for (int i = 2; i <= x / 2; i++) {
if (x % i == 0) { return false; }
}
return true;
}

/**
* Calculer le nombre de paires (i, j) tel que i + j = x et i et j sont des nombres premiers.
*/
int goldbach(int x)
{
int compte = 0;
if (x <= 2) { return 0; }
for (int i = 2; i <= x / 2; i++) {
if (estPremier(i) && estPremier(x - i)) { compte++; }
}
return compte;
}

int main()
{
int goldbachVrai;
int numPairs[N];
for (int i = 2; i < N; i += 2) { numPairs[i] = 0; }

// La version sequentielle
goldbachVrai = 1;
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
if (numPairs[i] == 0) { goldbachVrai = 2; }
}

// 使用shedule(...)
#pragma omp parallel for
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}

#pragma omp parallel for schedule(static, 32)
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}

#pragma omp parallel for schedule(dynamic)
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}

#pragma omp parallel for schedule(dynamic, 32)
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}

#pragma omp parallel for schedule(guided)
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}

#pragma omp parallel for schedule(guided, 32)
for (int i = 2; i < N; i += 2) {
numPairs[i] = goldbach(i);
}


return 0;
}

Ex2 斐波那契数列

截屏2025-10-07 11.57.25

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <cstdio>
#include <cstdlib>
#include "omp.h"

int fib(int n)
{
// fib(0) = 0, fib(1) = 1
if (n < 2) { return n; }
int x, y;

// Create a task for computing fib(n - 2). The task needs a shared access to x of the parent thread to return the result. If n is too small, do not create a separate task (to avoid task creation overhead)
#pragma omp task shared(x) if(n > 14)
x = fib(n - 2);

// Here, we can create a task for fib(n - 1). However, since the parent thread will only wait afterwards, it would be more optimized to not create a task, and let the parent thread compute fib(n - 1) instead.
// #pragma omp task shared(y)
y = fib(n - 1);

#pragma omp taskwait
return x + y;
}

void printUsage(int argc, char **argv)
{
printf("Usage: %s N\n", argv[0]);
printf("Example: %s 13\n", argv[0]);
}

int main(int argc, char **argv)
{

if (argc < 2) {
printUsage(argc, argv);
return 0;
}

// Read the index of the Fibonacci number to compute
const int N = atoi(argv[1]);

#pragma omp parallel shared(N)
{
#pragma omp single
printf("fib[%d] = %d\n", N, fib(N));
}

return 0;
}

截屏2025-10-07 11.57.45

合理运用depend(in : ) depend(out : )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <cstdio>
#include <cstdlib>
#include "omp.h"

void printUsage(int argc, char **argv)
{
printf("Usage: %s N\n", argv[0]);
printf("Example: %s 13\n", argv[0]);
}

int main(int argc, char **argv)
{
// Check the validity of command line arguments and print usage if invalid
// Verifier la validite des arguments fournis a la ligne de commande et afficher l'utilisation si invalid
if (argc < 2) {
printUsage(argc, argv);
return 0;
}

// Read the index of the Fibonacci number to compute
// Lire l'indice du nombre Fibonacci a calculer
const int N = atoi(argv[1]);

// Allocate and initialize the array containing Fibonacci numbers
// Allouer et initialiser le tableau contenant les nombres de Fibonacci
int fib[1000];
fib[0] = 0;
fib[1] = 1;
//==============================================================================
// TODO / A FAIRE ...
#pragma omp parallel default(none) shared(fib, N)
{
// Create tasks by a single thread only
#pragma omp single
for (int i = 2; i < N; i++) {
// Create a task for computing fib[i]. The task depends on fib[i - 1] and fib[i - 2] (input dependency) and modifies fib[i] (output dependency).
#pragma omp task shared(fib) firstprivate(i) depend(in:fib[i - 1], fib[i - 2]) depend(out:fib[i])
{
fib[i] = fib[i - 1] + fib[i - 2];
}
}
}
//==============================================================================
// Print all computed Fibonacci numbers until n
// Afficher tous les nombres Fibonacci jusqu'a n
printf("Fibonacci numbers: ");
for (int i = 0; i < N; i++) { printf("%d ", fib[i]); }
printf("\n");
return 0;
}

Ex3 计算矩阵向量乘法

截屏2025-10-07 12.05.01

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <iostream>
#include <chrono>
#include <vector>
#include "omp.h"

#define NREPET 1024

int main(int argc, char **argv)
{
std::cout << "Produit matrice-vecteur avec OpenMP\n";
if (argc < 2) {
std::cout << "Utilisation: " << argv[0] << " [num-lignes/colonnes]\n";
std::cout << " Example: " << argv[0] << " 1024\n";
return 1;
}
int dim = std::atoi(argv[1]);

std::vector<double> A(dim * dim);
std::vector<double> x(dim);
std::vector<double> b(dim);

// Initialiser A et x tel que A(i, j) = i + j et b(j) = 1.
for (int i = 0; i < dim; i++) {
for (int j = 0; j < dim; j++) {
A[i * dim + j] = i + j;
}
x[i] = 1;
}

// Calculer b = A * x NREPET fois en parallele en sequentiel
auto start = std::chrono::high_resolution_clock::now();
// Calculer b = A * x NREPET fois en parallele
for (int repet = 0; repet < NREPET; repet++) {
for (int i = 0; i < dim; i++) {
b[i] = 0;
for (int j = 0; j < dim; j++) {
b[i] += A[i * dim + j] * x[j];
}
}
}
std::chrono::duration<double> tempsSeq = std::chrono::high_resolution_clock::now() - start;
std::cout << std::scientific << "Temps d'execution sequentiel: " << tempsSeq.count() / NREPET << "s" << std::endl;

// Calculer b = A * x NREPET fois en parallele
start = std::chrono::high_resolution_clock::now();
for (int repet = 0; repet < NREPET; repet++) {
#pragma omp parallel for if (dim >= 64)
for (int i = 0; i < dim; i++) {
b[i] = 0;
for (int j = 0; j < dim; j++) {
b[i] += A[i * dim + j] * x[j];
}
}
}
std::chrono::duration<double> tempsPar = std::chrono::high_resolution_clock::now() - start;
std::cout << std::scientific << "Temps d'execution parallele avec omp for: " << tempsPar.count() / NREPET << "s" <<
std::endl;

// Calculer b = A * x NREPET fois en parallele avec omp task
start = std::chrono::high_resolution_clock::now();
for (int repet = 0; repet < NREPET; repet++) {
#pragma omp parallel
{
// Les taches sont generees par un thread dans une region parallele mais executees par tous les threads
#pragma omp single
for (int i = 0; i < dim; i++) {
#pragma omp task firstprivate(i) // Pour produit scalaire b[i] on genere une tache.
{
double res = 0.0; // Il faut travailler sur une variable temporaire pour eviter le faux partage
for (int j = 0; j < dim; j++) {
res += A[i * dim + j] * x[j];
}
b[i] = res; // Une seule ecriture qui evite le faux partage
}
}
}
}
std::chrono::duration<double> tempsParTasks = std::chrono::high_resolution_clock::now() - start;
std::cout << std::scientific << "Temps d'execution parallele avec omp tasks: " << tempsParTasks.count() / NREPET << "s" << std::endl;


// Calculer et afficher l'acceleration et l'efficacite de la parallelisation avec omp for
int numThreads;
#pragma omp parallel
#pragma omp single
numThreads = omp_get_num_threads(); // Attention, dans un region sequentiel omp_get_num_threads() rend 1
std::cout << "Acceleration: " << tempsSeq.count() / tempsPar.count() << std::endl;
std::cout << "Efficacite: " << tempsSeq.count() / (numThreads * tempsPar.count()) << std::endl;

// Verifier le resultat. b(i) est cense etre (dim - 1) * dim / 2 + i * dim
for (int i = 0; i < dim; i++) {
double val = (dim - 1) * (double)dim / 2.0 + (double)i * dim;
if (b[i] != val) {
std::cout << "valeur incorrecte: b[" << i << "] = " << b[i] << " != " << val << std::endl;
break;
}
if (i == dim - 1) {
std::cout << "Produit matrice-vecteur est effectue avec succes!\n";
}
}

return 0;
}

Ex4 归并

截屏2025-10-07 12.13.21

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <iostream>
#include <cstdlib>
#include <cstring>
#include <cassert>
#include <chrono>
#include "omp.h"

#define SWAP(a, b) { auto tmp = a; a = b; b = tmp; }
#define SIZE 1024

// Verify if the given array is sorted
bool isSorted(int *a, int size)
{
...
}
// Sort the array a with size elements, using temp as a temporary merge buffer
void merge(int* a, int size, int* temp)
{
...
}

/**
* Sort the array a with size elements, using temp as a temporary merge buffer
*/
void mergesort(int *a, int size, int *temp)
{
if (size < 2) { return; } // Nothing to sort / Rien a trier
if (size == 2) { // Only two elements to sort / Seulement deux elements a trier
if (a[0] <= a[1])
return;
else {
SWAP(a[0], a[1]);
return;
}
} else {
// Perform mergesort on each half array. Parallelize using tasks
// TODO / A FAIRE ...
// Create a task to sort the first half. If the array is too small, do not create a task to avoid the overhead
#pragma omp task if(size > 1000)
mergesort(a, size/2, temp); // first size / 2 elements / les premiers size / 2 elements

//#pragma omp task if(size > 1000) ## 让父进程跑着比较好
mergesort(a + size/2, size - size/2, temp + size/2); // last size / 2 elements

// Merge two sorted half arrays
// TODO / A FAIRE ...
#pragma omp taskwait
merge(a, size, temp);
}
}

void printUsage(int argc, char **argv)
{
printf("Usage: %s [size]\n", argv[0]);
printf("Example: %s 1024\n", argv[0]);
}

int main(int argc, char **argv)
{
int *a;
int *temp;
int size;

// Read array size and initialize
// Lire la taille du tableau et l'initialiser
if (argc < 2) {
printUsage(argc, argv);
return 0;
}
size = atoi(argv[1]);
a = (int *) malloc(size * sizeof(int));
temp = (int *) malloc(size * sizeof(int));
srand(0);
#pragma omp parallel default(none) shared(a, size)
{
#pragma omp for
for (int i = 0; i < size; ++i) {
a[i] = size - i;
}
}

// Sort the array a[size] using mergesort in parallel, using temp as a temporary buffer
// TODO / A FAIRE ...
auto start = std::chrono::high_resolution_clock::now();

#pragma omp parallel default(none) shared(a, size, temp)
{
#pragma omp single
mergesort(a, size, temp);
}

std::chrono::duration<double> sortTime = std::chrono::high_resolution_clock::now() - start;
printf("Sorting took %.4lf seconds.\n", sortTime.count());

// Verify if the array is sorted
// Verifier si le tableau est bien trie
if (isSorted(a, size)) { printf("The array was properly sorted.\n"); }
else { printf("There was an error when sorting the array.\n"); }

// Deallocate the arrays
// Desallouer les tableaux
free(a);
free(temp);

return 0;
}

LAB3

Ex1 复制数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* Copie d'un tableau dans un autre avec les intrinseques AVX.
* A compiler avec les drapeaux -O2 -mavx2.
*/


int main(int argc, char **argv)
{
if (argc < 2) {
afficherUsage();
return 1;
}
int dim = std::atoi(argv[1]);

// 初始化数组
// Allouer et initialiser deux tableaux de flottants de taille dim alignes par 32 octets
float* tab0;
float* tab1;
tab0 = (float*) _mm_malloc(dim * sizeof(float), 32);
tab1 = (float*) _mm_malloc(dim * sizeof(float), 32);

for (int i = 0; i < dim; i++)
{
tab0[i] = (float)i;
}

// 普通方法
// Copier tab0 dans tab1 de manière scalaire~(code non-vectorise).
for (int i = 0; i < dim; i++)
{
tab1[i] = tab0[i];
}

// AVX方法
// Copier tab0 dans tab1 de maniere vectorisee avec AVX
int i;
// dim-7为了不越界(最后一个i的安全起始是dim-8
// +=8是__m256一次最多处理8个float值
for(i = 0; i < dim - 7; i += 8) {
__m256 r0 = _mm256_load_ps(tab0 + i);
_mm256_store_ps(tab1 + i, r0);
}
// 复制剩余元素
for (; i < dim; i++) { tab1[i] = tab0[i]; }


// AVX展开版本(展开因子是4)
int i;
for(i = 0; i < dim - 31; i += 32) {
__m256 r0 = _mm256_load_ps(tab0 + i); // 处理元素 [i, i+7]
__m256 r1 = _mm256_load_ps(tab0 + i + 8); // 处理元素 [i+8, i+15]
__m256 r2 = _mm256_load_ps(tab0 + i + 16); // 处理元素 [i+16, i+23]
__m256 r3 = _mm256_load_ps(tab0 + i + 24); // 处理元素 [i+24, i+31]
_mm256_store_ps(tab1 + i, r0);
_mm256_store_ps(tab1 + i + 8, r1);
_mm256_store_ps(tab1 + i + 16, r2);
_mm256_store_ps(tab1 + i + 24, r3);
}
// Copier le dernier bout sans vectorisation
for (; i < dim; i++) { tab1[i] = tab0[i]; }


// Desallouer les tableaux tab0 et tab1
// A FAIRE ...
_mm_free(tab0);
_mm_free(tab1);

return 0;
}

Ex2 计算内积

截屏2025-10-14 11.53.41
1
2
3
4
5
6
7
8
9
// 标量版本
inline float produitScalaire(float *A, float *B, int taille)
{
float res = 0.0f;
for (size_t i = 0; i < taille; i++) {
res += A[i] * B[i]; // 累加每个元素的乘积
}
return res;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AVX向量版本
inline float produitScalaireAVX(float *A, float *B, int taille)
{
__m256 res = _mm256_set1_ps(0.0f); // 初始化一个8个0的向量
for (size_t i = 0; i < taille; i += 8) {
__m256 la = _mm256_load_ps(A + i); // 加载A的8个元素
__m256 lb = _mm256_load_ps(B + i); // 加载B的8个元素
__m256 lab = _mm256_mul_ps(la, lb); // 8个元素并行相乘 lab = la*lb
res = _mm256_add_ps(res, lab); // 累加到结果向量 res = res + lab
}
// 将8个部分和相加得到最终结果
float resTab[8] __attribute__((aligned(32)));
_mm256_store_ps(resTab, res);
return resTab[0] + resTab[1] + resTab[2] + resTab[3] +
resTab[4] + resTab[5] + resTab[6] + resTab[7];
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AVX + FMA 版本
inline float produitScalaireAVXFMA(float *A, float *B, int taille)
{
__m256 res = _mm256_set1_ps(0.0f);
for (size_t i = 0; i < taille; i += 8) {
__m256 la = _mm256_load_ps(A + i);
__m256 lb = _mm256_load_ps(B + i);
res = _mm256_fmadd_ps(la, lb, res); // 融合乘加:res = la*lb + res
}
// ...最终求和
float resTab[8] __attribute__((aligned(32)));
_mm256_store_ps(resTab, res);
return resTab[0] + resTab[1] + resTab[2] + resTab[3] +
resTab[4] + resTab[5] + resTab[6] + resTab[7];
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// AVX + FMA 的展开版本
inline float produitScalaireAVXFMADeroulement(float *__restrict__ A, float *__restrict__ B, int taille)
{
__m256 res1 = _mm256_set1_ps(0.0f); // 4个独立的累加器
__m256 res2 = _mm256_set1_ps(0.0f);
__m256 res3 = _mm256_set1_ps(0.0f);
__m256 res4 = _mm256_set1_ps(0.0f);
for (size_t i = 0; i < taille; i += 32) { // 一次处理32个元素
// 同时处理四组,每组8个元素
__m256 la1 = _mm256_load_ps(A + i);
__m256 la2 = _mm256_load_ps(A + i + 8);
__m256 la3 = _mm256_load_ps(A + i + 16);
__m256 la4 = _mm256_load_ps(A + i + 24);
__m256 lb1 = _mm256_load_ps(B + i);
__m256 lb2 = _mm256_load_ps(B + i + 8);
__m256 lb3 = _mm256_load_ps(B + i + 16);
__m256 lb4 = _mm256_load_ps(B + i + 24);
res1 = _mm256_fmadd_ps(la1, lb1, res1);
res2 = _mm256_fmadd_ps(la2, lb2, res2);
res3 = _mm256_fmadd_ps(la3, lb3, res3);
res4 = _mm256_fmadd_ps(la4, lb4, res4);
}
// 合并四个累加器
res1 = _mm256_add_ps(res1, res2);
res3 = _mm256_add_ps(res3, res4);
res1 = _mm256_add_ps(res1, res3);
// 最终求和
float resTab[8] __attribute__((aligned(32)));
_mm256_store_ps(resTab, res1);
return resTab[0] + resTab[1] + resTab[2] + resTab[3] +
resTab[4] + resTab[5] + resTab[6] + resTab[7];
}

// 简写版本
inline float produitScalaireAVXFMADeroulement(float *A, float *B, int taille)
{
__m256 res1 = _mm256_set1_ps(0.0f); // 4个独立的累加器
__m256 res2 = _mm256_set1_ps(0.0f);
__m256 res3 = _mm256_set1_ps(0.0f);
__m256 res4 = _mm256_set1_ps(0.0f);

for (size_t i = 0; i < taille; i += 32) { // 每次处理32个元素
// 同时处理4组,每组8个元素
res1 = _mm256_fmadd_ps(_mm256_load_ps(A + i), _mm256_load_ps(B + i), res1);
res2 = _mm256_fmadd_ps(_mm256_load_ps(A + i + 8), _mm256_load_ps(B + i + 8), res2);
res3 = _mm256_fmadd_ps(_mm256_load_ps(A + i + 16), _mm256_load_ps(B + i + 16), res3);
res4 = _mm256_fmadd_ps(_mm256_load_ps(A + i + 24), _mm256_load_ps(B + i + 24), res4);
}

// 合并4个累加器
res1 = _mm256_add_ps(res1, res2);
res3 = _mm256_add_ps(res3, res4);
res1 = _mm256_add_ps(res1, res3);
// ...最终求和
}

Ex3 反转一个数组

数组反转:将数组 [0, 1, 2, 3, 4, 5, 6, 7] 变成 [7, 6, 5, 4, 3, 2, 1, 0]

注意:考虑N > 16

1
2
3
4
5
6
// 标量版本
for (int i = 0; i < dim / 2; i++) {
float temp = tab[i];
tab[i] = tab[dim - 1 - i];
tab[dim - i] = temp;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// AVX2版本 N是8的倍数且大于16
__m256i permIdx = _mm256_set_epi32(0, 1, 2, 3, 4, 5, 6, 7);
// 创建索引向量:[0, 1, 2, 3, 4, 5, 6, 7]
// 注意:_mm256_set_epi32 的参数顺序是从高位到低位
// 实际存储为:[7, 6, 5, 4, 3, 2, 1, 0]

int i = 0; // 左侧指针
int j = dim - 8; // 右侧指针

for (; i < j; i += 8, j -= 8) {
// 加载左侧8个元素,反转,存储到右侧
_mm256_store_ps(tab + j, _mm256_permutevar8x32_ps(_mm256_load_ps(tab + i), permIdx));
// 加载右侧8个元素,反转,存储到左侧
_mm256_store_ps(tab + i, _mm256_permutevar8x32_ps(_mm256_load_ps(tab + j), permIdx));
}

// 处理中间的8个元素(如果数组大小不是16的倍数)
if (i == j) {
_mm256_store_ps(tab + i, _mm256_permutevar8x32_ps(_mm256_load_ps(tab + i), permIdx));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 完善dim小于16和不是8的整数
if (dim < 16) {
// 小数组用标量版本
for (int i = 0; i < dim / 2; i++) {
float temp = tab[i];
tab[i] = tab[dim - 1 - i]; // 修复原始bug
tab[dim - 1 - i] = temp;
}
} else {
// AVX2 处理
__m256i permIdx = _mm256_set_epi32(0, 1, 2, 3, 4, 5, 6, 7);
int i = 0;
int j = dim - 8;

// 处理16元素的完整块
for (; i < j; i += 8, j -= 8) {
// 需要临时存储避免数据竞争
__m256 left = _mm256_load_ps(tab + i);
__m256 right = _mm256_load_ps(tab + j);

_mm256_store_ps(tab + j, _mm256_permutevar8x32_ps(left, permIdx));
_mm256_store_ps(tab + i, _mm256_permutevar8x32_ps(right, permIdx));
}

// 处理中间的8个元素
if (i == j) {
_mm256_store_ps(tab + i, _mm256_permutevar8x32_ps(_mm256_load_ps(tab + i), permIdx));
}

// 处理剩余元素(标量方式)
// 对于 N=20,需要处理中间的4个元素 [8,9,10,11]
if (i > j) {
int start = i;
int end = j + 8;

for (int k = start; k < (start + end) / 2; k++) {
float temp = tab[k];
tab[k] = tab[end - 1 - (k - start)];
tab[end - 1 - (k - start)] = temp;
}
}
}

Ex4 计算矩阵乘向量 B = Ax

$$b[i] = \sum_{j=0}^{N-1} A[i][j] \times x[j]$$

Ann*Xn1=Bn1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 初始化
std::vector<float> A(N * N); // N×N 矩阵(行优先存储)
std::vector<float> x(N); // 输入向量
std::vector<float> b(N); // 结果向量

for (int i = 0; i < N; i++) {
x[i] = 1; // x = [1, 1, 1, ..., 1]
for (int j = 0; j < N; j++) {
A[i * N + j] = i + j; // A[i][j] = i + j
}
}

// e.g. N = 4
A = [0 1 2 3]
[1 2 3 4]
[2 3 4 5]
[3 4 5 6]
1
2
3
4
5
6
// 标量版本
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
b[i] = b[i] + A[i * N + j] * x[j]; // 传统的逐元素计算
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// AVX2版本
float bvecarr[8]; // 用于存储向量寄存器的数组
__m256 bvec; // 256位向量寄存器(8个float)
__m256 xvec; // 存储x向量的8个元素
__m256 avec; // 存储A矩阵的8个元素

for (int i = 0; i < N; i++) { // 遍历矩阵的每一行
bvec = _mm256_set1_ps(0.0f); // 初始化累加向量为[0,0,0,0,0,0,0,0]

for (int j = 0; j < N; j += 8) { // 每次处理8个元素
avec = _mm256_loadu_ps(&A[i * N + j]); // 加载A[i][j]到A[i][j+7]
xvec = _mm256_loadu_ps(&x[j]); // 加载x[j]到x[j+7]

// FMA指令:融合乘加运算
bvec = _mm256_fmadd_ps(avec, xvec, bvec); // bvec = avec*xvec + bvec
}

// 将8个部分和归约为单个标量
_mm256_storeu_ps(&bvecarr[0], bvec); // 将向量存储到数组

// 树状归约优化指令级并行性
bvecarr[0] += bvecarr[4]; // 第一层:[0+4, 1+5, 2+6, 3+7]
bvecarr[1] += bvecarr[5];
bvecarr[2] += bvecarr[6];
bvecarr[3] += bvecarr[7];

bvecarr[0] += bvecarr[2]; // 第二层:[0+2, 1+3]
bvecarr[1] += bvecarr[3];

b[i] = bvecarr[0] + bvecarr[1]; // 最终结果
}

LAB4 矩阵转置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
#include <iostream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <cstdlib>
#include "immintrin.h" // AVX内部函数头文件
#include <chrono> // 高精度计时
#include "omp.h" // OpenMP并行编程

#define NREPET 1001 // 重复执行次数,用于准确测量性能

void printUsage(int argc, char **argv)
{
printf("Usage: %s N\n", argv[0]);
printf("Example: %s 1024\n", argv[0]);
}

// 使用AVX指令加载8×8矩阵块到向量寄存器
// 每个__m256向量存储一行的8个浮点数
inline void loadTile(__m256 tile[], float *addr, int N)
{
tile[0] = _mm256_load_ps(addr);
tile[1] = _mm256_load_ps(addr + N);
tile[2] = _mm256_load_ps(addr + 2 * N);
tile[3] = _mm256_load_ps(addr + 3 * N);
tile[4] = _mm256_load_ps(addr + 4 * N);
tile[5] = _mm256_load_ps(addr + 5 * N);
tile[6] = _mm256_load_ps(addr + 6 * N);
tile[7] = _mm256_load_ps(addr + 7 * N);
}

// 将向量寄存器中的8×8矩阵块存储回内存
// 与loadTile相对应,将计算结果写回原矩阵
inline void storeTile(__m256 tile[], float *addr, int N)
{
_mm256_store_ps(addr, tile[0]);
_mm256_store_ps(addr + N, tile[1]);
_mm256_store_ps(addr + 2 * N, tile[2]);
_mm256_store_ps(addr + 3 * N, tile[3]);
_mm256_store_ps(addr + 4 * N, tile[4]);
_mm256_store_ps(addr + 5 * N, tile[5]);
_mm256_store_ps(addr + 6 * N, tile[6]);
_mm256_store_ps(addr + 7 * N, tile[7]);
}

void printMatrix(const float *mat, int N)
{
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
printf("%3.f ", mat[i * N + j]);
}
printf("\n");
}
printf("\n");
}

void printTile(__m256 tile[8])
{
float temp[64] __attribute__((aligned(32)));
storeTile(tile, temp, 8);
printMatrix(&temp[0], 8);
}

// 验证矩阵转置结果的正确性
// 转置后的矩阵应满足:B[i][j] = A[j][i]
// 在本例中,A[i][j] = i-j,所以B[i][j] = j-i
void verify(const float *mat, int N)
{
int correct = 1;
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
if (mat[i * N + j] != j - i) { // 检查转置结果是否正确
printf("mat(%d, %d) = %.f is incorrect; mat(%d, %d) should be %.f\n", i, j, mat[i * N + j], i, j, (float)(j - i));
correct = 0;
break;
}
}
}
if (correct) {
printf("The result is correct!\n\n");
} else {
printf("The result is not correct!\n\n");
}
}


// AVX优化的8×8矩阵转置核心函数
// 使用三步法实现高效的向量化转置:
// 1. unpack: 交错排列相邻行的元素
// 2. shuffle: 重新排列4元素组
// 3. permute: 交换上下128位部分
inline void transAVX8x8_ps(__m256 tile[8])
{
__m256 tile2[8]; // 第一步中间结果
__m256 tile3[8]; // 第二步中间结果

// 第一步:使用unpack指令交错排列相邻行的元素
// unpacklo/hi将两个向量的低/高位元素交错排列
tile2[0] = _mm256_unpacklo_ps(tile[0], tile[1]); // [a0,b0,a1,b1,a4,b4,a5,b5]
tile2[1] = _mm256_unpackhi_ps(tile[0], tile[1]); // [a2,b2,a3,b3,a6,b6,a7,b7]
tile2[2] = _mm256_unpacklo_ps(tile[2], tile[3]); // [c0,d0,c1,d1,c4,d4,c5,d5]
tile2[3] = _mm256_unpackhi_ps(tile[2], tile[3]); // [c2,d2,c3,d3,c6,d6,c7,d7]
tile2[4] = _mm256_unpacklo_ps(tile[4], tile[5]); // [e0,f0,e1,f1,e4,f4,e5,f5]
tile2[5] = _mm256_unpackhi_ps(tile[4], tile[5]); // [e2,f2,e3,f3,e6,f6,e7,f7]
tile2[6] = _mm256_unpacklo_ps(tile[6], tile[7]); // [g0,h0,g1,h1,g4,h4,g5,h5]
tile2[7] = _mm256_unpackhi_ps(tile[6], tile[7]); // [g2,h2,g3,h3,g6,h6,g7,h7]

// 第二步:使用shuffle指令重新排列4元素组
// shuffle在每个128位通道内重新排列元素
tile3[0] = _mm256_shuffle_ps(tile2[0], tile2[2], 0b01000100); // [a0,b0,c0,d0,a4,b4,c4,d4]
tile3[1] = _mm256_shuffle_ps(tile2[0], tile2[2], 0b11101110); // [a1,b1,c1,d1,a5,b5,c5,d5]
tile3[2] = _mm256_shuffle_ps(tile2[1], tile2[3], _MM_SHUFFLE(1, 0, 1, 0)); // [a2,b2,c2,d2,a6,b6,c6,d6]
tile3[3] = _mm256_shuffle_ps(tile2[1], tile2[3], _MM_SHUFFLE(3, 2, 3, 2)); // [a3,b3,c3,d3,a7,b7,c7,d7]
tile3[4] = _mm256_shuffle_ps(tile2[4], tile2[6], _MM_SHUFFLE(1, 0, 1, 0)); // [e0,f0,g0,h0,e4,f4,g4,h4]
tile3[5] = _mm256_shuffle_ps(tile2[4], tile2[6], _MM_SHUFFLE(3, 2, 3, 2)); // [e1,f1,g1,h1,e5,f5,g5,h5]
tile3[6] = _mm256_shuffle_ps(tile2[5], tile2[7], _MM_SHUFFLE(1, 0, 1, 0)); // [e2,f2,g2,h2,e6,f6,g6,h6]
tile3[7] = _mm256_shuffle_ps(tile2[5], tile2[7], _MM_SHUFFLE(3, 2, 3, 2)); // [e3,f3,g3,h3,e7,f7,g7,h7]

// 第三步:使用permute2f128交换上下128位部分,完成最终转置
// 0x20: 取第一个向量的低128位和第二个向量的低128位
// 0x31: 取第一个向量的高128位和第二个向量的高128位
tile[0] = _mm256_permute2f128_ps(tile3[0], tile3[4], 0x20); // [a0,b0,c0,d0,e0,f0,g0,h0]
tile[1] = _mm256_permute2f128_ps(tile3[1], tile3[5], 0x20); // [a1,b1,c1,d1,e1,f1,g1,h1]
tile[2] = _mm256_permute2f128_ps(tile3[2], tile3[6], 0x20); // [a2,b2,c2,d2,e2,f2,g2,h2]
tile[3] = _mm256_permute2f128_ps(tile3[3], tile3[7], 0x20); // [a3,b3,c3,d3,e3,f3,g3,h3]
tile[4] = _mm256_permute2f128_ps(tile3[0], tile3[4], 0x31); // [a4,b4,c4,d4,e4,f4,g4,h4]
tile[5] = _mm256_permute2f128_ps(tile3[1], tile3[5], 0x31); // [a5,b5,c5,d5,e5,f5,g5,h5]
tile[6] = _mm256_permute2f128_ps(tile3[2], tile3[6], 0x31); // [a6,b6,c6,d6,e6,f6,g6,h6]
tile[7] = _mm256_permute2f128_ps(tile3[3], tile3[7], 0x31); // [a7,b7,c7,d7,e7,f7,g7,h7]
}

// 将矩阵A的8×8子块转置后存储到矩阵B的对应位置
// 这是最基本的8×8转置操作,从源矩阵读取,转置后写入目标矩阵
inline void transAVX8x8(float *tA, float *tB, __m256 tile[8], int N)
{
loadTile(tile, tA, N); // 从源矩阵A加载8×8块
transAVX8x8_ps(tile); // 执行AVX转置操作
storeTile(tile, tB, N); // 将转置结果存储到目标矩阵B
}

// 原地转置:交换两个对称位置的8×8子块并各自转置
// 这种方法可以节省一半的内存空间,适用于方阵的原地转置
inline void transAVX8x8InPlace(float *tA, float *tA2, __m256 tile[8], __m256 tile2[8], int N)
{
loadTile(tile, tA, N); // 加载位置(i,j)的8×8块
loadTile(tile2, tA2, N); // 加载位置(j,i)的8×8块
transAVX8x8_ps(tile); // 转置第一个块
transAVX8x8_ps(tile2); // 转置第二个块
storeTile(tile, tA2, N); // 将第一个块的转置结果存储到位置(j,i)
storeTile(tile2, tA, N); // 将第二个块的转置结果存储到位置(i,j)
}

int main(int argc, char **argv)
{
// 获取命令行参数:矩阵大小N
if (argc != 2) {
printUsage(argc, argv);
return 0;
}
int N = std::atoi(argv[1]);
__m256 tile[8], tile2[8]; // AVX向量寄存器,用于存储8×8块


// 分配32字节对齐的内存并初始化矩阵A和B
// 对齐内存可以提高AVX指令的性能
float *A = (float *)_mm_malloc(N * N * sizeof(float), 32);
float *B = (float *)_mm_malloc(N * N * sizeof(float), 32);
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
A[i * N + j] = i - j; // A[i][j] = i - j,转置后应该是B[i][j] = j - i
B[i * N + j] = 0; // 初始化B矩阵为0
}
}


// 方法1:传统的顺序标量转置(基准版本)
// 简单的双层循环,逐个元素进行转置:B[i][j] = A[j][i]
{
auto start = std::chrono::high_resolution_clock::now();
// TODO / A FAIRE ...
for (int repet = 0; repet < NREPET; repet++) {
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
B[i * N + j] = A[j * N + i]; // 标量转置操作
}
}
}
std::chrono::duration<double> time = std::chrono::high_resolution_clock::now() - start;
std::cout << "Sequential transpose: " << 1000 * time.count() / NREPET << "ms\n";
std::cout << "Performance: " << (long long) N * N * sizeof(float) / (1e9 * time.count() / NREPET) << "GB/s\n";
verify(B, N);
}


// 方法2:8×8分块的AVX向量化转置
// 将矩阵分成8×8的小块,使用AVX指令进行高效转置
// 优势:向量化处理8个数据,更好的缓存局部性
{
memset(B, 0, N * N * sizeof(float)); // 清零目标矩阵
auto start = std::chrono::high_resolution_clock::now();
// TODO / A FAIRE ...
for (int repet = 0; repet < NREPET; repet++) {
for (int i = 0; i < N; i += 8) { // 按8×8块遍历行
for (int j = 0; j < N; j += 8) { // 按8×8块遍历列
// 将A[i:i+8, j:j+8]转置后存储到B[j:j+8, i:i+8]
transAVX8x8(&A[i * N + j], &B[j * N + i], tile, N);
}
}
}
std::chrono::duration<double> time = std::chrono::high_resolution_clock::now() - start;
std::cout << "AVX transpose: " << 1000 * time.count() / NREPET << "ms\n";
std::cout << "Performance: " << (long long) N * N * sizeof(float) / (1e9 * time.count() / NREPET) << "GB/s\n";
verify(B, N);
}


// 方法3:原地AVX转置(内存优化版本)
// 只处理矩阵的上三角部分,通过交换对称位置的8×8块实现原地转置
// 优势:节省一半内存空间,减少内存访问次数
{
memcpy(B, A, N * N * sizeof(float));
auto start = std::chrono::high_resolution_clock::now();
// TODO / A FAIRE ...
for (int repet = 0; repet < NREPET; repet++) {
for (int i = 0; i < N; i += 8) {
for (int j = 0; j <= i; j += 8) { // 只处理上三角和对角线块
// 交换并转置位置(i,j)和(j,i)的8×8块
transAVX8x8InPlace(&A[i * N + j], &A[j * N + i], tile, tile2, N);
}
}
}
std::chrono::duration<double> time = std::chrono::high_resolution_clock::now() - start;
std::cout << "AVX in-place transpose: " << 1000 * time.count() / NREPET << "ms\n";
std::cout << "Performance: " << (long long) N * N * sizeof(float) / (1e9 * time.count() / NREPET) << "GB/s\n";
verify(A, N);
memcpy(A, B, N * N * sizeof(float)); // 恢复原矩阵
}


// 方法4:OpenMP任务并行的原地AVX转置(终极优化版本)
// 结合了AVX向量化、原地转置和任务并行的所有优化技术
// 使用128×128的大块划分任务,避免任务粒度过小的开销
{
memcpy(B, A, N * N * sizeof(float)); // 备份原矩阵
auto start = std::chrono::high_resolution_clock::now();
int B1 = 128; // 每个任务处理的块大小(128×128)
for (int repet = 0; repet < NREPET; repet++) {
#pragma omp parallel default(none) shared(A, N, B1, tile, tile2)
{
// 并行生成任务:每个大块独立,可以并行创建任务
#pragma omp for collapse(2)
for (int i = 0; i < N; i += B1) {
for (int j = 0; j <= i; j += B1) { // 只处理上三角部分
// 创建处理128×128块的任务
#pragma omp task default(none) shared(A, N, B1) firstprivate(i, j) private(tile, tile2)
{
int i1max = std::min(i + B1, N);
// 在128×128块内按8×8进行原地转置
for (int i1 = i; i1 < i1max; i1 += 8) {
int j1max = std::min(j + B1, i1 + 1); // 确保只处理上三角部分
for (int j1 = j; j1 < j1max; j1 += 8) {
// 交换并转置8×8子块
transAVX8x8InPlace(&A[i1 * N + j1], &A[j1 * N + i1], tile, tile2, N);
}
}
}
}
}
}
}
std::chrono::duration<double> time = std::chrono::high_resolution_clock::now() - start;
std::cout << "AVX in-place transpose with OpenMP tasks: " << 1000 * time.count() / NREPET << "ms\n";
std::cout << "Performance: " << (long long) N * N * sizeof(float) / (1e9 * time.count() / NREPET) << "GB/s\n";
verify(A, N);
memcpy(A, B, N * N * sizeof(float)); // 恢复原矩阵
}


// 释放内存
_mm_free(A);
_mm_free(B);

return 0;
}

LAB5 矩阵乘法

截屏2025-10-19 17.41.18

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CPU核心

L1缓存 (Level 1) - 最快,最小

L2缓存 (Level 2) - 较快,较大

L3缓存 (Level 3) - 慢一些,更大

主内存 (RAM) - 最慢,最大


// 单级分块
整个矩阵直接分成32×32的小块
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
32×32│32×3232×32│32×3232×32│32×3232×32│32×32
├─────┼─────┼─────┼─────┼─────┼─────┼─────┼─────┤
32×32│32×3232×32│32×3232×32│32×3232×32│32×32
├─────┼─────┼─────┼─────┼─────┼─────┼─────┼─────┤
32×32│32×3232×32│32×3232×32│32×3232×32│32×32
...

// 双级分块
先分成256×256大块,再在每个大块内分成32×32小块
┌─────────────────┬─────────────────┐
256×256256×256
│ ┌──┬──┬──┬──┐ │ ┌──┬──┬──┬──┐ │
│ │32│32│32│32│ │ │32│32│32│32│ │
│ ├──┼──┼──┼──┤ │ ├──┼──┼──┼──┤ │
│ │32│32│32│32│ │ │32│32│32│32│ │
│ └──┴──┴──┴──┘ │ └──┴──┴──┴──┘ │
├─────────────────┼─────────────────┤
256×256256×256
└─────────────────┴─────────────────┘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/*
(b)Question?
i→k→j is faster

for exemple: we have 3x3 matrix A,B,C
Matrix A: [A00, A01, A02, A10, A11, A12, A20, A21, A22]
Matrix B: [B00, B01, B02, B10, B11, B12, B20, B21, B22]
Matrix C: [C00, C01, C02, C10, C11, C12, C20, C21, C22]

i→j→k Order Access:
When computing C[0][0] (i=0, j=0):
k=0: Access A[0], B[0] → A00, B00
k=1: Access A[1], B[3] → A01, B10
k=2: Access A[2], B[6] → A02, B20

Matrix B is accessed with large jumps (B[0], B[3], B[6])
which leads to poor cache locality for large matrices.



i→k→j Order Access:
When computing C[0][0] (i=0, k=0):
j=0: Access A[0], B[0] → A00, B00
j=1: Access A[0], B[1] → A00, B01
j=2: Access A[0], B[2] → A00, B02

Matrix B is accessed sequentially (B[0], B[1], B[2]),
which improves cache locality and performance.




===================================================
Result:

Baseline version (i→j→k): 5.707559s, 375.89 Mflops/s
Final optimized version: 0.15s, 14278.94 Mflops/s

Final Speedup = 5.707559 / 0.15 = 38.05x

===================================================

Theoretical Peak = Cores × Frequency × FMA Operations × SIMD Width
= 4 × 2.30GHz × 2 × 8 = 147.2 Gflops/s

Actual Performance: 14.28 Gflops/s
Theoretical Peak: 147.2 Gflops/s
Peak Efficiency: 14.28 / 147.2 = 9.7%

===================================================

zhuoyuanjin@c8cpu16g:~/HPC$ ./matmat-avx 1024
Sequential scalar matmat i->j->k took 5.707559e+00s.
Performance: 375.89Mflops/s
The result is correct!

Sequential scalar matmat i->k->j took 2.78e+00s.
Performance: 771.02Mflops/s
The result is correct!

Single tile scalar matmat i->k->j took 2.97e+00s.
Performance: 721.39Mflops/s
The result is correct!

Double tile scalar matmat i->k->j took 2.98e+00s.
Performance: 719.98Mflops/s
The result is correct!

Double tile AVX matmat i->k->j took 6.33e-01s.
Performance: 3391.35Mflops/s
The result is correct!

Task parallel double tile AVX matmat i->k->j took 1.50e-01s.
Performance: 14278.94Mflops/s
The result is correct!

*/

#include <iostream>
#include <iomanip>
#include <vector>
#include <cstring>
#include <cstdlib>
#include "immintrin.h"
#include <chrono>
#include "omp.h"

#define NREPEAT 10

void printUsage(int argc, char **argv)
{
printf("Usage: %s N\n", argv[0]);
printf("Example: %s 1024\n", argv[0]);
}

void verify(const float *A, const float *B, const float *C, int N)
{
int correct = 1;
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
if (C[i * N + j] != N) {
printf("C(%d, %d) = %f is incorrect; C(%d, %d) should be %d\n", i, j, C[i * N + j], i, j, N);
correct = 0;
break;
}
}
}
if (correct) {
printf("The result is correct!\n\n");
} else {
printf("The result is not correct!\n\n");
}
}

inline void loadTile(__m256 tile[8], float *addr, int N)
{
// Load the 8x8 subtile from memory whose upper left element is addr[0] in an NxN matrix
// TODO / A FAIRE ...
// 加载8行,每行8个连续元素
// (e)
for (int i = 0; i < 8; i++) {
tile[i] = _mm256_load_ps(&addr[i * N]);
}

}

inline void storeTile(__m256 tile[8], float *addr, int N)
{
// Store the 8x8 subtile into memory whose upper left element is addr[0], in an NxN matrix
// TODO / A FAIRE ...
// (e)
for (int i = 0; i < 8; i++) {
_mm256_store_ps(&addr[i * N], tile[i]);
}

}

inline void multiplyTile(float *tA, float *tB, float *tC, __m256 atile[8], __m256 btile[8], __m256 ctile[8], int N)
{
loadTile(btile, tB, N); // 加载矩阵B的8*8块到寄存器中
loadTile(ctile, tC, N); // 加载矩阵C
for (int i = 0; i < 8; i++) { // 处理矩阵C的第i行
for (int k = 0; k < 8; k++){ // 矩阵A的第i行第k列元素
// A的第一行要乘以B的每一列然后累加变成C的第一行
__m256 a_broadcast = _mm256_broadcast_ss(&tA[i * N + k]); // 广播A[i][k],e.g.广播A00变成[A00,A00,A00,A00,A00,A00,A00,A00]
ctile[i] = _mm256_fmadd_ps(a_broadcast, btile[k], ctile[i]); // C00=A00*[B00,B01,B02,B03,B04,B05,B06,B07]
}
}
storeTile(ctile, tC, N);
}

int main(int argc, char **argv)
{
if (argc != 2) {
printUsage(argc, argv);
return 0;
}
int N = std::atoi(argv[1]);
const int B1 = 32;
const int B2 = 256;

// Allocate and initialize the matrix A and vectors x, b
// Allouer et initialiser la matrice A et matrices x, b
float *A = (float *)_mm_malloc(N * N * sizeof(float), 32);
float *B = (float *)_mm_malloc(N * N * sizeof(float), 32);
float *C = (float *)_mm_malloc(N * N * sizeof(float), 32);
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
A[i * N + j] = 1.0f;
B[i * N + j] = 1.0f;
C[i * N + j] = 0.0f;
}
}

// Sequential and scalar matrix-matrix multiplication code with loop order i->j->k
// Code sequentiel et scalaire produit matrice-matrice avec l'ordre de boucles i->j->k
{
auto start = std::chrono::high_resolution_clock::now();
// TODO / A FAIRE ...
// (a) i->j->k
for (int i = 0; i < N; i++){
for (int j = 0; j < N; j++){
for (int k = 0; k < N; k++){
C[i * N + j] += A[i * N + k] * B[k * N + j];
}
}
}

std::chrono::duration<double> timeDiff = std::chrono::high_resolution_clock::now() - start;
std::cout << std::scientific << "Sequential scalar matmat i->j->k took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}

// Sequential and scalar matrix-matrix multiplication code with loop order i->k->j
// Code sequentiel et scalaire produit matrice-matrice avec l'ordre de boucles i->k->j
{
auto start = std::chrono::high_resolution_clock::now();
for (int repeat = 0; repeat < NREPEAT; repeat++) {
memset(&C[0], 0, N * N * sizeof(float));
// TODO / A FAIRE ...
// (b) i->k->j
for (int i = 0; i < N; i++){
for (int k = 0; k < N; k++){
for (int j = 0; j < N; j++){
C[i * N + j] += A[i * N + k] * B[k * N + j];
}
}
}
}
std::chrono::duration<double> timeDiff = (std::chrono::high_resolution_clock::now() - start) / NREPEAT;
std::cout << std::scientific << "Sequential scalar matmat i->k->j took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}

// Sequential and scalar matrix-matrix multiplication code with loop order i->k->j and single level tiling
// Code sequentiel et scalaire produit matrice-matrice avec l'ordre de boucles i->k->j et tuilage d'un niveau
{
auto start = std::chrono::high_resolution_clock::now();
for (int repeat = 0; repeat < NREPEAT; repeat++) {
memset(&C[0], 0, N * N * sizeof(float));
// TODO / A FAIRE ...
//(c) B1=32 单级分块
for (int i = 0; i < N; i += B1) { //每次挑32行
for (int k = 0; k < N; k += B1) { //每次挑32列
for (int j = 0; j < N; j += B1) { //每次挑32列
// 获取当前32x32块的起始地址
float *tA = &A[i * N + k];
float *tB = &B[k * N + j];
float *tC = &C[i * N + j];
//在32x32块内进行乘法运算
for (int i2 = 0; i2 < B1; i2++) {
for (int k2 = 0; k2 < B1; k2++) {
for (int j2 = 0; j2 < B1; j2++) {
tC[i2 * N + j2] += tA[i2 * N + k2] * tB[k2 * N + j2];
}
}
}
}
}
}
}
std::chrono::duration<double> timeDiff = (std::chrono::high_resolution_clock::now() - start) / NREPEAT;
std::cout << std::scientific << "Single tile scalar matmat i->k->j took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}


// Sequential and scalar matrix-matrix multiplication code with loop order i->k->j and two level tiling
// Code sequentiel et scalaire produit matrice-matrice avec l'ordre de boucles i->k->j et tuilage de deux niveaux
{
auto start = std::chrono::high_resolution_clock::now();
for (int repeat = 0; repeat < NREPEAT; repeat++) {
memset(&C[0], 0, N * N * sizeof(float));
// TODO / A FAIRE ...
// (d) B2=256 et B1=32 双极分块
// 256*256
for (int i = 0; i < N; i += B2) {
for (int k = 0; k < N; k += B2) {
for (int j = 0; j < N; j += B2) {

// 32*32
for (int i1 = 0; i1 < B2; i1 += B1) {
for (int k1 = 0; k1 < B2; k1 += B1) {
for (int j1 = 0; j1 < B2; j1 += B1) {

float *tA = &A[(i + i1) * N + (k + k1)];
float *tB = &B[(k + k1) * N + (j + j1)];
float *tC = &C[(i + i1) * N + (j + j1)];

// multiplication in the 32*32 block
for (int i2 = 0; i2 < B1; i2++) {
for (int k2 = 0; k2 < B1; k2++) {
for (int j2 = 0; j2 < B1; j2++) {
tC[i2 * N + j2] += tA[i2 * N + k2] * tB[k2 * N + j2];
}
}
}
}
}
}
}
}
}
}
std::chrono::duration<double> timeDiff = (std::chrono::high_resolution_clock::now() - start) / NREPEAT;
std::cout << std::scientific << "Double tile scalar matmat i->k->j took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}


// Vectorized matrix-matrix multiplication code with loop order i->k->j and two level tiling + AVX
// Produit matrice-matrice vectorise avec l'ordre de boucles i->k->j et tuilage de deux niveaux + AVX
{
auto start = std::chrono::high_resolution_clock::now();
for (int repeat = 0; repeat < NREPEAT; repeat++) {
memset(&C[0], 0, N * N * sizeof(float));
// 为每个线程分配8个寄存器空间
__m256 atile[8], btile[8], ctile[8];
// TODO / A FAIRE ...
// (e)双层分块+AVX
// 第一层256*256
for (int i = 0; i < N; i += B2) {
for (int k = 0; k < N; k += B2) {
for (int j = 0; j < N; j += B2) {
// 第二层32*32
for (int i1 = 0; i1 < B2; i1 += B1) {
for (int k1 = 0; k1 < B2; k1 += B1) {
for (int j1 = 0; j1 < B2; j1 += B1) {
// 变成8*8的子块进行AVX运算
for (int i2 = 0; i2 < B1; i2 += 8) {
for (int k2 = 0; k2 < B1; k2 += 8) {
for (int j2 = 0; j2 < B1; j2 += 8) {

float *tA = &A[(i + i1 + i2) * N + (k + k1 + k2)];
float *tB = &B[(k + k1 + k2) * N + (j + j1 + j2)];
float *tC = &C[(i + i1 + i2) * N + (j + j1 + j2)];

multiplyTile(tA, tB, tC, atile, btile, ctile, N);
}
}
}
}
}
}
}
}
}
}

std::chrono::duration<double> timeDiff = (std::chrono::high_resolution_clock::now() - start) / NREPEAT;
std::cout << std::scientific << "Double tile AVX matmat i->k->j took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}


// Task-parallel and vectorized matrix-matrix multiplication code with loop order i->k->j and two level tiling + AVX
// Produit matrice-matrice vectorise et parallelise par taches avec l'ordre de boucles i->k->j et tuilage de deux
// niveaux+AVX
{
auto start = std::chrono::high_resolution_clock::now();
for (int repeat = 0; repeat < NREPEAT; repeat++) {
memset(&C[0], 0, N * N * sizeof(float));
// TODO / A FAIRE ...
// (f)双层分块+AVX+OpenMP任务并行

#pragma omp parallel
{
#pragma omp single
{
// 第一层256*256
for (int i = 0; i < N; i += B2) {
for (int k = 0; k < N; k += B2) {
for (int j = 0; j < N; j += B2) {

// 每个任务处理一个256*256的大块,e.g.对于1024,一共16个任务
#pragma omp task depend(inout: C[i*N+j:B2*N]) depend(in: A[i*N+k:B2*N], B[k*N+j:B2*N]) firstprivate(i,j,k)
{
__m256 local_atile[8], local_btile[8], local_ctile[8];

// 第二层32*32
for (int i1 = 0; i1 < B2; i1 += B1) {
for (int k1 = 0; k1 < B2; k1 += B1) {
for (int j1 = 0; j1 < B2; j1 += B1) {
// 变成8*8的子块进行AVX运算
for (int i2 = 0; i2 < B1; i2 += 8) {
for (int k2 = 0; k2 < B1; k2 += 8) {
for (int j2 = 0; j2 < B1; j2 += 8) {

float *tA = &A[(i + i1 + i2) * N + (k + k1 + k2)];
float *tB = &B[(k + k1 + k2) * N + (j + j1 + j2)];
float *tC = &C[(i + i1 + i2) * N + (j + j1 + j2)];

multiplyTile(tA, tB, tC, local_atile, local_btile, local_ctile, N);
}
}
}
}
}
}
}
}
}
}
#pragma omp taskwait
}
}
}
std::chrono::duration<double> timeDiff = (std::chrono::high_resolution_clock::now() - start) / NREPEAT;
std::cout << std::scientific << "Task parallel double tile AVX matmat i->k->j took " << timeDiff.count() << "s." << std::endl;
std::cout << std::fixed << std::setprecision(2) << "Performance: " << 2.0*N*N*(N-1) / ((1e6) * timeDiff.count()) <<
"Mflops/s" << std::endl;
verify(A, B, C, N);
}

// Free matrices
// Desallouer les matrices
_mm_free(A);
_mm_free(B);
_mm_free(C);

return 0;
}