voidTaskSystemParallelSpawn::run(IRunnable* runnable, int num_total_tasks){ std::atomic<int> task_id(0);
auto func = [&]() { while (true) { int cur_task_id = task_id.fetch_add(1); if (cur_task_id >= num_total_tasks) { return; } runnable->runTask(cur_task_id, num_total_tasks); } };
std::vector<std::thread> threads; for (int i = 0; i < max_threads; i++) { threads.push_back(std::thread(func)); }
for (auto& thread : threads) { thread.join(); } }
1.3 使用线程池的版本
这个稍微复杂一点,但对于Cpper来说也算是经典的面试场景或者八股了
首先线程池的每个线程需要不断地从任务队列中取出任务并执行,因此一定是个死循环的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
voidTaskSystemParallelThreadPoolSpinning::worker(){ while (!is_terminate) { std::unique_lock<std::mutex> lock(mspin); if (left_task_num <= 0) { lock.unlock(); std::this_thread::yield(); // 让出 CPU 时间片,减少自旋等待 continue; } int task_id = total_task_num - left_task_num; left_task_num--; lock.unlock(); // 提前释放锁
runner->runTask(task_id, total_task_num);
finished_task_num++; } }
构造函数需要创建线程池:
1 2 3 4 5 6 7 8 9 10 11 12 13
TaskSystemParallelThreadPoolSpinning::TaskSystemParallelThreadPoolSpinning(int num_threads): ITaskSystem(num_threads), max_threads(num_threads) { // // CS149 student implementations may decide to perform setup // operations (such as thread pool construction) here. // Implementations are free to add new class member variables // (requiring changes to tasksys.h). // for (int i = 0; i < this->max_threads; i++) { this->worker_pool.push_back(std::thread([this]() { worker(); })); } }
voidTaskSystemParallelThreadPoolSpinning::run(IRunnable* runnable, int num_total_tasks){
// // CS149 students will modify the implementation of this // method in Part A. The implementation provided below runs all // tasks sequentially on the calling thread. //
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads), num_threads(num_threads) { // // TODO: CS149 student implementations may decide to perform setup // operations (such as thread pool construction) here. // Implementations are free to add new class member variables // (requiring changes to tasksys.h). // stop = false; total_task_num = left_task_num = 0;
for (int i = 0; i < this->num_threads; ++i) { workers.push_back(std::thread([this]() { worker(); })); } }
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() { // // TODO: CS149 student implementations may decide to perform cleanup // operations (such as thread pool shutdown construction) here. // Implementations are free to add new class member variables // (requiring changes to tasksys.h). // this->stop = true; cv_worker.notify_all(); for (auto& worker : workers) { if (worker.joinable()) { worker.join(); // 等待所有线程完成 } }
voidTaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks){
// // TODO: CS149 students will modify the implementation of this // method in Parts A and B. The implementation provided below runs all // tasks sequentially on the calling thread. //
// // TODO: CS149 students will modify the implementation of this method in Part B. // for (constauto& pair : this->task_contexts) { TaskContext* task_context = pair.second;