Why Parallelism? Why Efficiency?

observation:

  • communication limits the maximun speedup of parallel
  • imbalance of work load limits the speedup
  • communication can be the domain limits

历史上cpu性能提升极快,并行没那么有用
近年来,并行计算和提高时钟周期极大提升性能

计算机视角:程序只是一段指令序列
alt text

  • superscalar processor execution
    processor自动去找彼此独立的指令来做并行

  • instruction level parallelism(ILP)
    通常到4就上不去了(不专门设计的程序的并行性有限)
    很多地方会使用专用处理器(NPU,TPU,GPU)

  • memory:just an array of bytes(dram这些是具体的implementation)

A Modern Multi-Core Processor (Part I)

示例:利用泰勒近似计算数组中每个值的sin 函数值

for (int i = 0; i < n; i++) {
y[i] = sin(x[i]); // 通过泰勒级数近似计算sin(x[i])
}

内部的并行性是有限的,但是每个循环所操作的值完全不同,这种复杂的并行性很难被超标量处理器做到

  • 1.多核处理器
    减少单个处理器的复杂程度,给处理器设置多个核(实际上将并行的责任推高到了代码层次,通过多线程实现)
    alt text
  • 2.单指令多数据(SIMD)
    给单个处理核心配备多个算术单元,并配备相应的向量寄存器和向量处理指令
    提高吞吐量,但是当遇到分支控制语句的时候可能导致空闲(同一时间各ALU需要执行相同指令)
    alt text
    alt text
  • 3.多进程与上下文切换
    单核心内部复制多个上下文,当某个任务阻塞时(如IO,缓存未命中)切换到另一个任务
    data prefetching:预测未来哪些数据会被使用并提前读取

Multi-core Arch Part II + ISPC Programming Abstractions

alt text
superscalar:多个fetch decode和多个ALU
multi-thread:多个寄存器上下文
SIMD:向量ALU和寄存器

memory bandwidth:the rate at which memory can provide data to processor
latency:the time of transfer some thing(decoupled with bandwidth)
rate of instruction is always determined by bandwidth
alt text
区分:latency and throughput(流水线可以使latency更高的指令吞吐量提高)
alt text

abstraction vs implementation

alt text

ISPC:Intel SIMD programming compiler
call to ISPC function just create a bunch of program instances
all instances run concurrently,has own local variable
keyword of foreach:the iteration must be down by the entire bunch of instances

PA1

prog1

alt text

  • gdb:编译的时候使用-g开启调试
    gcc编译分为编译和链接两个阶段,我们的-g和-O优化选项一定要放在编译时
    负载不均衡:看上去均分的三个区域,因为有些区域的值可以提前跳出,导致某些线程所用时间非常大,总体很慢
    计时:在workerthreadstart(子线程)正式计算前后计数

  • spatial decompositon:将任务分解为空间中相邻的算法
    两种分解方法!!!
    alt text
    alt text
    为了平衡利用缓存

//
// workerThreadStart --
//
// Thread entrypoint.
void workerThreadStart(WorkerArgs * const args) {

double t_startTime = CycleTimer::currentSeconds();

const unsigned int CHUNK_SIZE = 16; // 或 8、32 等,需要测试最优值

// 每次处理多行,减少函数调用开销
for (unsigned int cur_row = args->threadId * CHUNK_SIZE;
cur_row < args->height;
cur_row += args->numThreads * CHUNK_SIZE) {

// 确保不会越界
int numRows = std::min(CHUNK_SIZE,
args->height - cur_row);

mandelbrotSerial(args->x0, args->y0, args->x1, args->y1,
args->width, args->height,
cur_row, numRows,
args->maxIterations, args->output);
}

double t_endTime = CycleTimer::currentSeconds();
printf("Thread %d: [%.3f] ms\n", args->threadId,
(t_endTime - t_startTime) * 1000);
}

prog2

//无向量对应循环,使用递归,只要当前还有一位满足条件就还得循环
void recurssive_while(__cs149_vec_int& count,__cs149_vec_float&result,__cs149_vec_float&input,__cs149_mask&NotExpZero){
_cs149_vgt_int(NotExpZero,count,IntZero,NotExpZero);
if(_cs149_cntbits(NotExpZero)==0)return;
_cs149_vmult_float(result,result,input,NotExpZero);
_cs149_vsub_int(count,count,IntOne,NotExpZero);
recurssive_while(count,result,input,NotExpZero);
}
void clampedExpVector(float* values, int* exponents, float* output, int N) {
__cs149_mask IsExpZero;
__cs149_mask NotExpZero;
__cs149_vec_int exp;
__cs149_vec_float val;
__cs149_mask IsOver;
for(int i=0;i<N;i+=VECTOR_WIDTH){
__cs149_mask Init;
int pos=i;
Init=_cs149_init_ones(min(VECTOR_WIDTH,N-i));//直接仅仅只加载几个,后面的直接不管,只要读取和写入逻辑正确就是对的
_cs149_vload_float(val,values+pos,Init);
_cs149_vload_int(exp,exponents+pos,Init);
_cs149_vset_float(ret,1.f,Init);
//if y==0 out=1
_cs149_vlt_int(IsExpZero,exp,IntOne,Init);
//else
NotExpZero =_cs149_mask_not(IsExpZero);
_cs149_vmove_float(ret,val,NotExpZero);
__cs149_vec_int count;
_cs149_vsub_int(count,exp,IntOne,NotExpZero);
//while
recurssive_while(count,ret,val,NotExpZero);
_cs149_vgt_float(IsOver,ret,upper,Init);
_cs149_vset_float(ret,9.999999f,IsOver);
_cs149_vstore_float(output+pos,ret,Init);
}
}

prog3

  • ispc basic:
//export:表示该ispc函数是被c/cpp调用的
//uniform:表示该变量在所有的program instances保持不变(从c传出来的参数必须uniform)
//数组参数和c一样,会被转化为指向第一个元素的指针
//foreach:是通过声明利用SIMD

export void mandelbrot_ispc(uniform float x0, uniform float y0,
uniform float x1, uniform float y1,
uniform int width, uniform int height,
uniform int maxIterations,
uniform int output[])
{
float dx=(x1-x0)/width;
float dy=(y1-y0)/height;
for(uniform int j=0;j<height;j++){
foreach(i=0...width){
float x = x0 + i * dx;
float y = y0 + j * dy;
int index = j * width + i;
output[index] = mandel(x, y, maxIterations);
}
}
}
//ISPC利用多线程的机制是launch task
export void mandelbrot_ispc_withtasks(uniform float x0, uniform float y0,
uniform float x1, uniform float y1,
uniform int width, uniform int height,
uniform int maxIterations,
uniform int output[])
{

uniform int rowsPerTask = height / 16;

// create 2 tasks
launch[16] mandelbrot_ispc_task(x0, y0, x1, y1,
width, height,
rowsPerTask,
maxIterations,
output);
}

区分task and multithread:
多线程你指定了开这么多就是得开这么多,开销极大
task只是往工作队列里面压入这么多
foreach:单核SIMD加速
tasks:多核并行

prog4

最大化加速比:

  • 减少随机性,让每个数据的计算量接近,SIMD跑满
  • 大量计算量,可以掩盖多进程和内存读写的开销
  • 有些时候可能存在"拔河":改变参数同时导致不同数据计算量不同的同时增大了计算量(看哪边更明显)

最小化的情景:

  • 计算开销相对而言远小于内存搬运
  • 对SIMD而言,严重的条件分支可能导致空转

AVX(是指令集,常用的是AVX intrinsinc):

  • load和store最好unaligned(你申请的内存不一定32位等对齐了)
    alt text
    alt text
    以下代码出处:zhihu
void sqrtAVX(int N, float initialGuess, float *values, float *output) {
// 输入参数有效性检查
if (!values || !output || N <= 0) return;

// 设置收敛阈值
static const float kThreshold = 0.00001f;

// 计算能被 8 整除的最大长度(AVX2 可以同时处理 8 个 float)
int aligned_n = (N / 8) * 8;

// 使用 AVX 指令并行处理每 8 个元素
for (int i = 0; i < aligned_n; i += 8) {
// 将阈值广播到 256 位向量中(8 个 float)
__m256 threshold = _mm256_set1_ps(kThreshold);
// 从内存加载 8 个连续的 float 到向量寄存器
__m256 x_vec = _mm256_loadu_ps(&values[i]);
// 将初始猜测值广播到向量寄存器
__m256 guess = _mm256_set1_ps(initialGuess);

// 计算初始误差:|guess² * x - 1|
__m256 pred = _mm256_sub_ps(
_mm256_mul_ps(_mm256_mul_ps(guess, guess), x_vec),
_mm256_set1_ps(1.0f)
);
// 计算误差的绝对值
pred = _mm256_andnot_ps(_mm256_set1_ps(-0.0f), pred);

/*
1. 在浮点数的二进制表示中,最高位(符号位)为:
- 0:表示正数
- 1:表示负数

2. `-0.0f` 的二进制表示是 `1000 0000 0000 0000 0000 0000 0000 0000`
- 即只有符号位为 1,指数和尾数位都是 0

3. `_mm256_andnot_ps(a, b)` 的操作相当于 `~a & b`,即:
- 先对第一个操作数取位反(NOT)
- 然后与第二个操作数进行位与(AND)

4. 所以 `_mm256_andnot_ps(_mm256_set1_ps(-0.0f), x)` 的过程是:

x: [sign][exponent][mantissa] (原始数)
-0.0f: 1000...000 (全 0,仅符号位为 1)
~(-0.0f): 0111...111 (全 1,仅符号位为 0)
~(-0.0f) & x: 0[exponent][mantissa] (保持数值位不变,符号位强制为 0)


这样就巧妙地将任何负数转换为对应的正数,而正数保持不变,从而实现了绝对值的计算。
*/


// 当任意一个元素的误差大于阈值时继续迭代
while (_mm256_movemask_ps(_mm256_cmp_ps(pred, threshold, _CMP_GT_OQ)) != 0) {
// _mm256_movemask_ps 是 AVX 指令集中的一个函数,用于从 256 位浮点向量中提取每个元素的符号位,并将这些符号位组合成一个 8 位的整数。
// 牛顿迭代公式:guess = (3 * guess - x * guess³) * 0.5
guess = _mm256_mul_ps(
_mm256_set1_ps(0.5f),
_mm256_sub_ps(
_mm256_mul_ps(_mm256_set1_ps(3.0f), guess),
_mm256_mul_ps(x_vec, _mm256_mul_ps(guess, _mm256_mul_ps(guess, guess)))
)
);

// 重新计算误差
pred = _mm256_sub_ps(
_mm256_mul_ps(_mm256_mul_ps(guess, guess), x_vec),
_mm256_set1_ps(1.0f)
);
pred = _mm256_andnot_ps(_mm256_set1_ps(-0.0f), pred);
}

// 计算最终结果 sqrt(x) = x * guess,并存储到输出数组
_mm256_storeu_ps(&output[i], _mm256_mul_ps(x_vec, guess));
}

// 使用标准算法处理剩余的元素(不足 8 个的部分)
for (int i = aligned_n; i < N; i++) {
float x = values[i];
float guess = initialGuess;
float error = fabs(guess * guess * x - 1.f);

while (error > kThreshold) {
guess = (3.f * guess - x * guess * guess * guess) * 0.5f;
error = fabs(guess * guess * x - 1.f);
}

output[i] = x * guess;
}
}

prog5&6

读写瓶颈:写入的时候先读入缓存行再写,再存回去,这样很耗时间,一种方法是直接写回主存
ispc:禁止foreach嵌套,传指针就使用数组符号即可

//多线程
for(int i=0;i<THREAD_NUMBER;i++){
woker[i]=std::thread(computeAssignments,&arg[i]);
}

for(int i=0;i<THREAD_NUMBER;i++){
if(woker[i].joinable())
woker[i].join();
}