Stanford-CS149-并行计算-Assignment2-任务执行库

0 作业简单介绍

这次的作业是实现一个任务执行库, 可以看做是mini版的ispc, 需要实现task的创建、执行, 并且需要支持task的依赖关系, 也就是task的执行顺序。难度基本上就是一个C++并发编程的练习, 这里做一个简单的介绍。

作业的核心就是下面几个函数:

1
2
3
4
5
6
7
8
9
10
virtual void run(IRunnable* runnable, int num_total_tasks) = 0;
virtual TaskID runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) = 0;
virtual void sync() = 0;

class IRunnable {
public:
virtual ~IRunnable();
virtual void runTask(int task_id, int num_total_tasks) = 0;
};
  • 这里的IRunnable是一个接口, 对应的就是具体的并行计算任务, 这里我们只需要调用IRunnablerunTask函数即可。task_id就是单个任务的序号, 类似ispcprogramIndex, 而num_total_tasks就类似总ispcprogramCount, 这里就不展开了。
  • run函数是同步执行的, 也就是run函数执行完毕后, 所有的task都执行完毕了。
  • runAsyncWithDeps函数是异步执行的, 其不等待task执行完毕, 而是直接返回。返回的TaskIDtask的唯一标识, 可以用于后续的依赖关系设置。
  • sync: 同步函数, 等待所有task执行完毕。

1 Part A

Part A只要求完成同步的run函数, 官方还贴心的给出了逐步的实现步骤, 首先是顺序执行的版本, 然后是立即创建线程的版本, 再然后是使用线程池的版本, 最后是使用线程池+条件变量的版本。

1.1 顺序执行的版本

这个版本不需要自己做啥操作, 官方给出了实现

1.2 立即创建线程的版本

这里要求使用的线程不超过给定的数量, 这里我的实现是直接创建容许的最大线程数, 然后每个线程不断地轮序下一个执行任务的序号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void TaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks) {
std::atomic<int> task_id(0);

auto func = [&]() {
while (true) {
int cur_task_id = task_id.fetch_add(1);
if (cur_task_id >= num_total_tasks) {
return;
}
runnable->runTask(cur_task_id, num_total_tasks);
}
};

std::vector<std::thread> threads;
for (int i = 0; i < max_threads; i++) {
threads.push_back(std::thread(func));
}

for (auto& thread : threads) {
thread.join();
}
}

1.3 使用线程池的版本

这个稍微复杂一点,但对于Cpper来说也算是经典的面试场景或者八股了

首先线程池的每个线程需要不断地从任务队列中取出任务并执行,因此一定是个死循环的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TaskSystemParallelThreadPoolSpinning::worker() {
while (!is_terminate) {
std::unique_lock<std::mutex> lock(mspin);
if (left_task_num <= 0) {
lock.unlock();
std::this_thread::yield(); // 让出 CPU 时间片,减少自旋等待
continue;
}
int task_id = total_task_num - left_task_num;
left_task_num--;
lock.unlock(); // 提前释放锁

runner->runTask(task_id, total_task_num);

finished_task_num++;
}
}

构造函数需要创建线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(int num_threads): ITaskSystem(num_threads), max_threads(num_threads) {
//
// CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
for (int i = 0; i < this->max_threads; i++) {
this->worker_pool.push_back(std::thread([this]() {
worker();
}));
}
}

run函数直接修改任务参数即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void TaskSystemParallelThreadPoolSpinning::run(IRunnable* runnable, int num_total_tasks) {


//
// CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
//

finished_task_num = 0;
this->total_task_num = num_total_tasks;
runner = runnable;

{
std::lock_guard<std::mutex> lock(mspin);
left_task_num = num_total_tasks;
}

while (finished_task_num < num_total_tasks) {
std::this_thread::yield(); // 让出 CPU 时间片,减少自旋等待
}
}

需要注意的是, 这里并没有显式的队列的概念, 因为每个任务只需要一个序号即可, 因此维护的left_task_num就是逻辑上的一个队列。
另一方面, td::this_thread::yield()是让出当前线程的执行权, 减少自旋等待, 从而减少CPU的消耗。

1.4 使用线程池+条件变量的版本

这个版本需要使用条件变量来等待所有任务执行完毕, 这里需要使用std::condition_variablestd::mutex来实现。其实和前者差距不大,就是在while循环中使用cv.wait来等待所有任务执行完毕。

worker函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void TaskSystemParallelThreadPoolSleeping::worker() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_worker);
auto wait_func = [this] { return this->stop || this->left_task_num > 0; };
cv_worker.wait(lock, wait_func);

if (stop && left_task_num == 0) {
break;
}

int task_id = total_task_num - left_task_num;
left_task_num--;
lock.unlock();

runner->runTask(task_id, total_task_num);

{
std::lock_guard<std::mutex> finish_lock(mtx_finish);
finished_task_num++;
if (finished_task_num == total_task_num) {
cv_finish.notify_one();
}
}
}
}

run函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
runner = runnable;
finished_task_num = 0;
total_task_num = num_total_tasks;

{
std::lock_guard<std::mutex> lock(mtx_worker);
left_task_num = num_total_tasks;
}

cv_worker.notify_all();

std::unique_lock<std::mutex> lock(mtx_finish);
auto wait_func = [this](){return this->finished_task_num == this->total_task_num;};
cv_finish.wait(lock, wait_func);
}

2 Part B

这一部分要求实现runAsyncWithDeps函数, 也就是异步执行的函数, 需要支持任务的依赖关系。
直接贴出代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
* ================================================================
* Parallel Thread Pool Sleeping Task System Implementation
* ================================================================
*/

const char* TaskSystemParallelThreadPoolSleeping::name() {
return "Parallel + Thread Pool + Sleep";
}

void TaskSystemParallelThreadPoolSleeping::worker() {
while (true) {
std::unique_lock<std::mutex> lock(mtx_worker);
auto wait_func = [this] { return this->stop || this->left_task_num > 0; };
cv_worker.wait(lock, wait_func);

if (stop && left_task_num == 0) {
break;
}

int task_id = total_task_num - left_task_num;
left_task_num--;
lock.unlock();

runner->runTask(task_id, total_task_num);

{
std::lock_guard<std::mutex> finish_lock(mtx_finish);
finished_task_num++;
if (finished_task_num == total_task_num) {
cv_finish.notify_one();
}
}
}
}

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads), num_threads(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
stop = false;
total_task_num = left_task_num = 0;

for (int i = 0; i < this->num_threads; ++i) {
workers.push_back(std::thread([this]() { worker(); }));
}
}

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
//
// TODO: CS149 student implementations may decide to perform cleanup
// operations (such as thread pool shutdown construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
this->stop = true;
cv_worker.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join(); // 等待所有线程完成
}
}

// 释放所有任务上下文
for (auto& pair : task_contexts) {
delete pair.second;
}
}

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {


//
// TODO: CS149 students will modify the implementation of this
// method in Parts A and B. The implementation provided below runs all
// tasks sequentially on the calling thread.
//

runner = runnable;
finished_task_num = 0;
total_task_num = num_total_tasks;

{
std::lock_guard<std::mutex> lock(mtx_worker);
left_task_num = num_total_tasks;
}

cv_worker.notify_all();

std::unique_lock<std::mutex> lock(mtx_finish);
auto wait_func = [this](){return this->finished_task_num == this->total_task_num;};
cv_finish.wait(lock, wait_func);
}

TaskID TaskSystemParallelThreadPoolSleeping::runAsyncWithDeps(IRunnable* runnable, int num_total_tasks,
const std::vector<TaskID>& deps) {


//
// TODO: CS149 students will implement this method in Part B.
//

int cur_task_id = this->next_task_id.fetch_add(1);
TaskContext* task_context = new TaskContext(cur_task_id);

// 在另一个线程中执行任务 TaskSystemParallelThreadPoolSleeping::run 函数
std::thread([this, runnable, num_total_tasks, task_context, &deps]() {
// 先等待依赖任务完成
for (const auto& dep : deps) {
TaskContext* task_context = this->task_contexts[dep];
std::unique_lock<std::mutex> lock(task_context->mtx);
task_context->cv.wait(lock, [task_context]() { return task_context->is_finished; });
}

this->run(runnable, num_total_tasks);
{
std::lock_guard<std::mutex> lock(task_context->mtx);
task_context->is_finished = true;
}
task_context->cv.notify_all();
}).detach();

this->task_contexts[cur_task_id] = task_context;

return cur_task_id;
}

void TaskSystemParallelThreadPoolSleeping::sync() {

//
// TODO: CS149 students will modify the implementation of this method in Part B.
//
for (const auto& pair : this->task_contexts) {
TaskContext* task_context = pair.second;

std::unique_lock<std::mutex> lock(task_context->mtx);
task_context->cv.wait(lock, [task_context]() { return task_context->is_finished; });
}

return;
}

目前的实现还有些性能问题, 这里mark一下, 最近时间比较紧张, 等有时间了再优化一下。