Parallelizing Code

More in ISPC semantics

使用foreach:实际上是指定了任务(gang)总共迭代这么多次,编译器负责具体方案
alt text

  • 当我们试图在多个程序实例中对同一个变量进行操作(race condition)
  • 返回:应该是uniform的,返回多个程序实例的变量实际上没什么意义

e.g:计算数组之和
正确做法是在每个程序实例引入一个partial 局部变量,任务完成后将所有程序实例的partial合起来
(无论其实现逻辑是SIMD还是多线程都会产生竞争)

alt text
ispc gang is actually implemented by SIMD
写的时候像是SPMD(多个逻辑线程),但是优化的时候要考虑它实际上是SIMD
通过programindex,programcount等底层暴露才能在需要的时候优化性能
task:实际上是一个线程池(固定八个线程取任务来做)

Case study on thought process of writing and optimizing a parallel program

区分:硬件线程和操作系统线程(后者需要context switch,开销非常高,后者是一套处理单元对应多个状态,直接切换,开销低)
alt text
a small part of sequential can largely limit the speed up
key of decomposition is to identify independencies(often by programmer)
assignment:assign task to thread,vector lane etc(workload balance,reduce communication)
alt text

2d-grid solver

问题:在一个n乘n的网格中不断从上至下更新值为其周围值的平均,当变动小于阈值时收敛

  • decomposition:仅仅只有对角线元素互相是独立的,难以编写并且并行程度不高
    这种时候,我们甚至可能需要根据领域知识重写算法以得到更多并行性
    此处我们采用分两组更新(每个点和它周围的四个点分成不同的组)
  • assignment:
    (row)block or interleaved:考虑到一个core更新可能需要相邻row更新后的值,block可能更好
  • orchestration:
    lock:解决写冲突(写不是原子操作)
    barrier:解决时序冲突(有的线程先走可能导致某些变量被覆盖)
    alt text
    (当前使用diff[0],提前给下一轮用的diff[(index+1)%3],上一轮留下来diff[(index+2)%3])

optimization:work distribution and scheduling

tips:always implement the simplest solution and measure performance to determin the next

balancing the work load

  • static assignment:assign didn’t depend on dynamic behaviour
    简单,同时无需进行分配,可以减少开销(很多时候通信开销也可以削减)
    实际上在运行时间大概可预测的时候(统计意义上的预测也行)很有效
    semi-static:运行一段时间后根据结果再进行静态分配

  • dynamic assignment:to achieve good load balancing
    可以视为使用了一个巨大的任务队列,每个进程挨个去取任务,自然而然有关于队列信息的进程间通信开销
    任务划分的权衡:任务划分的更细就有更多的灵活性以确保负载均衡,但是要同步队列状态可能导致极高的通信开销(lock and unlock)
    也有可能就是有个别任务时间极长:一种方法是先分配长时间的任务(sorted)

const int N = 1024;  
const int GRANULARITY = 10;
// assume allocations are only executed by 1 thread
float* x = new float[N];
bool* prime = new bool[N];
// assume elements of x are initialized here
LOCK counter_lock;
int counter = 0;
while (1) {
int i;
lock(counter_lock);
i = counter;
counter += GRANULARITY;
unlock(counter_lock);
if (i >= N)
break;
int end = min(i + GRANULARITY, N);
for (int j=i; j<end; j++)
is_prime[i] = test_primality(x[i]);
}

减少通信开销:每个线程给自己独立的任务队列,优先从自己的队列中取任务
任务队列中的任务可能具有依赖关系:线程也可以将任务添加到队列中(如果b依赖于a,某个线程执行完a后把b加到队列里面去)

Scheduling fork-join parallelism

之前谈的大多是数据并行性(data parallelism:对不同数据进行相同操作)
另一种表示并行的方法:树
e.g:快排,其天然可以并行分治部分,由于递归可以带来很高的并行性
alt text
alt text
notice:the cilk is just the abstraction,for implementation,complete sequential and complete parallel are acceptable

术语:child(spawn出来的),continuation(spawn后面的)
实际上有两种策略(run child first and run continuation first)

  • continuation first:
    如果只是单纯for loop创建一堆任务,BFT of call graph
    caller spawn all thread before any one executed
  • child first:
    caller only create one thing for stealing
    depth first traversal of call graph

工作队列实际上使用的是双端队列:从尾部添加元素,头部移除元素(分治情况下工作量大,良好局部性)
可证明空闲线程随机steal已有相当好的性能

  • implementation:
    对整个spawn到sync命名为一整个block,
    当发生stole的时候,创建关于该block的descriptor:记录spawn了多少个任务,finish了多少个任务
    alt text

optimization:locality,communication and contention

  • 共享地址空间(像是一个公告板,软件实现简单,硬件实现相当复杂)
    alt text
  • 另一种通信机制:message passing
    更像是送邮件,不公开,由发送者和接收者之间
    alt text
    二维网格示例:每个线程需要存储比其负责更新的点更多的点以用于更新
    通信全部体现在send and receive(没有共享,每个都有副本所以不用互斥锁,而barrier是隐含在里面)
  • synchronize(blocking) send and receive
    send 之后要接到对方的信息才return, receive 之后 送出接收到的信息再return
    这样之前的实现就会导致死锁(大家都在等别人送回来)
  • asynchronize(non-blocking) send and receive
    调用之后立即返回一个句柄,不立即发送或接受(异步),在要确认接受/发送的情况下可以检测句柄

arithmetic intensity:
amount of computation/amount of communication

inherent vs. artifactual communication

inherent communication:communication that must occur in parallel algorithm
way to reduce:in the 2d grid example
arithmetic intensity is roughly 面积除以周长(所以按正方形来block可能就会比较好)

Artifactual communication: all other communication

  • example:由于缓存导致的通信
    大矩阵的按行遍历:alt text
    也可以通过fusing loop 来减少load(比如三个算术操作一次性计算,不用生成并访问临时数组)

contention

alt text

some tips:

  • measure
  • determine what is your programs limits
    alt text
  • be aware of scailing laws

assignment2

cpp synchronization

  • mutex
    专门一个类std::mutex
    mutex::lock()或者mutex::unlock
  • condition variables
    一个thread的condition_variable call wait(lk)表示它期望等到别的线程的notification,当wait返回的时候,当前线程就是lock holder(lk被std::unique_lock wrap)
    notify()notify一个别的线程,notify_all()所有线程

cpp报错
当看到几百行报错时,千万不要从第一行顺着往下读。你的眼睛需要像雷达一样寻找几个关键英文词组。
一个典型的模板报错结构如下:
In file included from… (包含路径,忽略)
In instantiation of… (模板实例化成什么具体类型,留心)
required from… (这是谁调用了模板,必定包含你的代码行号,重点!)
error: … (具体的错误原因,重点!)
note: candidate… (编译器尝试过的其他重载,通常是推导失败的原因,选看)
黄金法则:先找 required from,再找 error:

partA

传给线程的参数要么复制,要么就要保证在线程全周期该对象都不变(后者不好搞因为异步执行)
测量的时候:可能有很多次进行多任务并发,如果不重置任务计数就会仅仅只能测一次

  • spawn:
    spawn threads in every run:也可以固定线程数来分任务(千万别多少个任务多少个线程)
  • spinning:
TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(int num_threads)
: ITaskSystem(num_threads) {

// 线程池一旦创建就是整个期间一直在等待或执行任务
// 这里理论上应该用 std::atomic<bool>,否则存在数据竞争
ended = false;
pool_num = num_threads;
pool = new std::thread[pool_num];
cur_available = -1;
num_total = -1;
// 互斥锁,用于保护共享变量:
// cur_available / num_total / ran
notice = new std::mutex();
// this 表示成员函数需要绑定当前对象
// 线程创建一定在最后,否则可能访问到未初始化对象
for (int i = 0; i < pool_num; i++) {
pool[i] = std::thread(
&TaskSystemParallelThreadPoolSpinning::help_pool,
this
);
}
}

TaskSystemParallelThreadPoolSpinning::~TaskSystemParallelThreadPoolSpinning() {
ended = true;
// 等待所有线程结束(必须 join)
// 否则线程仍在运行,程序会出现未定义行为
for (int i = 0; i < pool_num; i++) {
pool[i].join();
}
delete[] pool;
delete notice;
}
void TaskSystemParallelThreadPoolSpinning::help_pool() {
// worker 线程主循环(自旋)
// 不断尝试获取任务并执行
while (!ended) {
int temp = -1; // 用于保存当前线程领取的任务编号
// ===== 临界区:任务分配 =====
// 多线程竞争 cur_available,需要加锁保护
notice->lock();
if (cur_available < num_total) {//初始化两者相同,所以线程空闲
temp = cur_available;
cur_available++;
}
notice->unlock();
// ===== 临界区结束 =====
// 如果成功领取到任务(temp != -1)
if (temp != -1) {
ran->runTask(temp, num_total);
// 原子增加完成计数
// 必须用 atomic,否则多个线程同时写会出现数据竞争
// memory_order_relaxed 表示:
// 只保证原子性,不保证顺序(性能更好)
counter.fetch_add(1, std::memory_order_relaxed);
}
// 如果 temp == -1:
// 当前没有任务,线程继续 while 循环(自旋等待)
}
}
void TaskSystemParallelThreadPoolSpinning::run(
IRunnable* runnable,
int num_total_tasks
) {
counter.store(0);
// ===== 临界区:发布任务 =====(runner和worker对这些变量也可能存在竞争)
notice->lock();
num_total = num_total_tasks;
// 设置任务执行对象
// worker 线程会调用 ran->runTask(...)
ran = runnable;
cur_available = 0;
notice->unlock();
// ===== 临界区结束 =====
// 主线程等待所有任务完成(自旋)
while (counter.load() < num_total_tasks) {
// 主动让出 CPU 时间片
// 避免完全占满 CPU(但仍属于 busy-wait)
std::this_thread::yield();
}
}

std::atomic<int>.load()//非原子操作,只有fetch_add才是

  • sleeping
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
flag=false;
pool_size=num_threads;
pool=new std::thread[pool_size];
cond=new std::condition_variable;
finish=new std::condition_variable;
locker=new std::mutex;
waiter=new std::mutex;
queuefront=-1;tasksize=-1;
for(int i=0;i<pool_size;i++){
pool[i]=std::thread(&TaskSystemParallelThreadPoolSleeping::help_run,this);
}
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
flag=true;
cond->notify_all();
for(int i=0;i<pool_size;i++){
pool[i].join();
}
delete[] pool;
delete cond;
delete locker;
delete finish;
delete waiter;

}
void TaskSystemParallelThreadPoolSleeping::help_run(){
while(true){
std::unique_lock<std::mutex> temp(*locker);
cond->wait(temp,[this]{return queuefront.load()<tasksize || flag;});
//防止线程a,b同时被唤醒,a接下了最后一个任务,之后切换到b
if(flag && queuefront.load()>=tasksize){
return;
}
if(queuefront.load()>=tasksize){
continue;
}
int id=queuefront.fetch_add(1);
temp.unlock();
ran->runTask(id,tasksize);
temp.lock();
if(counter.fetch_add(1)==tasksize-1)//仅仅最后一个任务去note
finish->notify_all();
temp.unlock();
}
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
queuefront.store(0);
counter.store(0);
tasksize=num_total_tasks;
ran=runnable;
cond->notify_all();
std::unique_lock<std::mutex> ano(*waiter);//这一句已经获取了这个锁
finish->wait(ano,[this]{return counter.load()==tasksize;});//wait的那个锁必须是当前线程已得到的
//防止:任务取完了但是没执行完
}

partB

还是别用真的队列,竞争,用原子计数器模拟就挺好
wait最好都用谓词免得虚假唤醒

Task::Task(int i){
SingleState=WaitingForSchedule;
Taskid=i;
}
/* 构造函数:初始化同步原语并启动线程池 */
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads)
: ITaskSystem(num_threads) {

flag = false;
Groupid = 0;
TaskVar = new std::condition_variable;
GroupVar = new std::condition_variable;
TaskLock = new std::mutex;
GroupLock = new std::mutex;

for (int i = 0; i < num_threads; i++) {
threads.emplace_back(&TaskSystemParallelThreadPoolSleeping::ThreadBehave, this);
}
}
/* 析构函数:确保安全退出并释放内存 */
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
{
std::lock_guard<std::mutex> lock(*TaskLock);
flag = true;
}
TaskVar->notify_all(); // 唤醒所有正在 wait 的线程

for (auto& t : threads) {
if (t.joinable()) t.join();
}

delete TaskVar; delete GroupVar;
delete TaskLock; delete GroupLock;

for (auto& pair : Groups) delete pair.second;
}
/* 核心线程行为:原子分发与依赖检查 */
void TaskSystemParallelThreadPoolSleeping::ThreadBehave() {
while (true) {
TaskGroup* target = nullptr;
int task_id = -1;

std::unique_lock<std::mutex> lock(*TaskLock);

// 核心调度:寻找一个依赖已满足且还有任务可领的组
TaskVar->wait(lock, [this, &target, &task_id] {
if (flag) return true;
for (auto& pair : Groups) {
TaskGroup* tg = pair.second;
if (!tg->isFinished.load()) {
// 检查所有依赖是否已完成
bool ready = true;
for (auto& dep_id : tg->deps) {
if (!Groups[dep_id]->isFinished.load()) {
ready = false; break;
}
}
if (ready) {
int idx = tg->next_idx.fetch_add(1);
if (idx < tg->TotalNum) {
target = tg;
task_id = idx;
return true;
}
}
}
}
return false;
});

if (flag) return;
lock.unlock(); // 拿到 ID 后立刻释放全局锁,允许其他线程领取

// 执行任务
target->worker->runTask(task_id, target->TotalNum);

// 任务完成后更新计数
if (target->completed_count.fetch_add(1) + 1 == target->TotalNum) {
{
std::lock_guard<std::mutex> gl(*GroupLock);
target->isFinished.store(true);
}
GroupVar->notify_all(); // 唤醒 sync()
TaskVar->notify_all(); // 唤醒可能在等待此依赖的其他线程
}
}
}
/* 异步提交:不阻塞主线程 */
TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks, const std::vector<TaskID>& deps) {
std::lock_guard<std::mutex> lock(*TaskLock);
int curid = Groupid++;
TaskGroup* newgroup = new TaskGroup(curid, num_total_tasks, runnable, deps);
Groups[curid] = newgroup;

TaskVar->notify_all();
return curid;
}
/* 同步:阻塞主线程直至所有任务完成 */
void TaskSystemParallelThreadPoolSleeping::sync() {
std::unique_lock<std::mutex> l(*GroupLock);
GroupVar->wait(l, [this] {
for (auto const& pair : Groups) {
if (!pair.second->isFinished.load()) return false;
}
return true;
});
}
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
runAsyncWithDeps(runnable, num_total_tasks, {});
sync();
}