【动手写协程库 2】协程调度器

【动手写协程库】系列笔记是学习 sylar 的协程库时的记录,参考了 从零开始重写sylar C++高性能分布式服务器框架 和代码随想录中的 文档。文章并不是对所有代码的详细解释,而是为了自己理解一些片段所做的笔记。

Scheduler 类中其他函数的定义可以在这里查看:Github: src/scheduler.cpp

Sylar 的协程调度器是一个 N-M 模型,意味着 N 个线程可以运行 M 个协程,协程能够在线程之间进行切换,也可以被绑定到特定的线程上执行。

调度器可以由应用程序中的任何线程创建,但创建它的线程(称为 caller 线程)可以选择是否参与协程的调度。如果 caller 线程参与调度,那么调度器的线程数会相应减少一个,因为 caller 线程本身也会作为一个调度线程。

Scheduler 相关 API 如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#ifndef SCHEDULER_H_
#define SCHEDULER_H_

#include <atomic>
#include <cstddef>
#include <functional>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include "coroutine.h"
#include "util.h"

// 协程调度器
class Scheduler {
private:
    // 调度任务,任务类型可以是协程/函数二选一,并且可指定调度线程
    using ThreadIdPtr = std::shared_ptr<std::thread::id>;

    struct SchedulerTask {
        ThreadIdPtr thread_id_;
        Coroutine::Ptr coroutine_;
        std::function<void()> callback_;

        SchedulerTask() {}

        SchedulerTask(Coroutine::Ptr co, ThreadIdPtr id)
            : coroutine_(co)
            , thread_id_(std::move(id)) {}

        SchedulerTask(std::function<void()> callback, ThreadIdPtr id)
            : callback_(callback)
            , thread_id_(std::move(id)) {}

        void Reset() {
            thread_id_.reset();
            callback_ = nullptr;
            coroutine_ = nullptr;
        }
    };

public:
    Scheduler(size_t threads = 1, bool use_caller = true, const std::string& name = "scheduler");

    virtual ~Scheduler();

    std::string GetName() const {
        return name_;
    }

    void Start();

    void Stop();

    template <typename TaskType>
    void Sched(TaskType t, ThreadIdPtr id = nullptr) requires(std::invocable<TaskType> || std::same_as<TaskType, Coroutine::Ptr>) {
        bool is_need_tick = false;

        {
            std::lock_guard lock(mutex_);
            is_need_tick = tasks_.empty();
            SchedulerTask task(t, id);
            if (task.callback_ || task.coroutine_) {
                tasks_.push_back(task);
            }
        }

        if (is_need_tick) {
            Tickle();
        }
    }

public:
    static Scheduler* GetScheduler();

    static Coroutine* GetSchedCoroutine();

protected:
    virtual void Tickle();

    void Run();

    void SetThisAsScheduler();

    virtual void Idle();

    virtual bool IsStop();

    bool HasIdleThreads() {
        return idle_threads_ > 0;
    }

private:
    std::string name_;
    std::mutex mutex_;
    std::vector<std::thread> thread_pool_;
    std::vector<std::thread::id> thread_ids_;
    std::list<SchedulerTask> tasks_;

    size_t threads_size_;
    std::atomic_size_t active_threads_{0};
    std::atomic_size_t idle_threads_{0};

    std::thread::id sched_id_; // use_caller为true时,调度器所在的线程id
    Coroutine::Ptr sched_co_;  // use_caller为true时调度器所在线程的调度协程
    bool is_stop_;
    bool is_use_caller_;
};

#endif /* SCHEDULER_H_ */

调度器的工作流大致为:

  • 协程调度器在初始化时可传入线程数和一个布尔型的 use_caller 参数,表示是否使用 caller 线程。在使用 caller 线程的情况下,线程数自动减一,并且调度器内部会初始化一个属于 caller 线程的调度协程并保存起来(比如,在 main 函数中创建的调度器,如果 use_caller 为 true,那调度器会初始化一个属于 main 函数线程的调度协程)。
  • 调度器创建好后 ,即可调用调度器的 Sched 函数向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。
  • 调用 Scheduler::Start() 函数启动调度 。调用 Start 会创建调度线程池,线程数量由初始化时的线程数和 use_caller 确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为 1 且 use_caller 为 true,那么 Start 方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由 caller 线程的调度协程来负责调度协程(这里有点绕),而 caller 线程的调度协程的执行时机与 Start 函数并不在同一个地方。caller 线程的调度协程的执行时机在 Stop 函数中。
  • 接下来是调度协程,对应 Scheduler::Run() 。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个 idle 协程,等有新任务进来时,idle 协程才会退出并回到调度协程,重新开始下一轮调度。(在 Scheduler 中,idle 函数的定义十分简单粗暴,因为实际使用协程库时并不是直接使用 Scheduler 类,而是使用它的派生类,在派生类中将会实现更为完善的调度)
  • 接下来是添加调度任务,对应 Scheduler::Sched() ,这个方法支持传入协程或函数,并且支持一个线程 id 参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次 tickle 方法以通知各调度线程的调度协程有新任务来了。在执行调度任务时,还可以通过调度器的 GetScheduler() 获取到当前调度器,再通过 Sched 函数继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。
  • 接下来是调度器的停止。调度器的停止行为要分两种情况讨论,首先是 use_caller 为 false 的情况,这种情况下,由于没有使用 caller 线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果 use_caller 为 true,表示 caller 线程也要参于调度,这时,调度器初始化时记录的属于 caller 线程的调度协程就要起作用了,在调度器停止前,应该让这个 caller 线程的调度协程也运行一次,让 caller 线程完成调度工作后再退出。如果调度器只使用了 caller 线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。

调度器中最重要的一个函数我认为就是 Run() 函数了,这个函数用于协程的调度,或者,你可以将他理解为是一个调度协程(名词)。

创建 Scheduler 时会为每一个内部线程池中的每一个线程都绑定一个调度协程,线程数量默认为 1,此时也默认会使用 caller 线程,也就是使用的主线程。调度协程 Scheduler::Run() 会从任务队列 Task Queue 中不断去取任务去执行。如果有任务可执行,那就切换至任务协程执行,任务协程执行完毕后又切换回调度协程;无任务执行时,调度协程切换至 Idle 协程进行等待。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 用于协程的调度
void Scheduler::Run() {
    LOG << "Scheduler running...\n";
    SetThisAsScheduler();

    // 如果当前线程不是调度器所在线程,设置调度的协程为当前线程运行的协程
    if (std::this_thread::get_id() != sched_id_) {
        sched_coroutine = Coroutine::GetNowCoroutine().get();
    }

    Coroutine::Ptr idle_co = std::make_shared<Coroutine>([this] { this->Idle(); });
    Coroutine::Ptr callback_co;

    SchedulerTask task;

    while (true) {
        task.Reset();
        bool tickle = false;
        {
            std::lock_guard lock(mutex_);
            auto iter = tasks_.begin();
            while (iter != tasks_.end()) {
                // 当前遍历的task已经分配了线程去执行且这个线程不是当前线程,则不用管
                if (iter->thread_id_ && *iter->thread_id_ != std::this_thread::get_id()) {
                    ++iter;
                    tickle = true;
                    continue;
                }
                if (iter->coroutine_ && iter->coroutine_->GetState() != Coroutine::READY) {
                    LOG << "Coroutine task's state should be READY!\n";
                    assert(false);
                }
                task = *iter;
                tasks_.erase(iter++);
                active_threads_++;
                break;
            }
            // 有任务可以去执行,需要tickle一下
            tickle |= (iter != tasks_.end());
        }
        if (tickle) {
            Tickle();
        }

        // 子协程执行完毕后yield会回到Run()中
        if (task.coroutine_) {
            // 任务类型为协程
            task.coroutine_->Resume();
            active_threads_--;
        } else if (task.callback_) {
            // 任务类型为回调函数
            if (callback_co) {
                callback_co->Reset(task.callback_);
            } else {
                callback_co = std::make_shared<Coroutine>(task.callback_);
            }
            callback_co->Resume();
            active_threads_--;
        } else {
            // 无任务,任务队列为空
            if (idle_co->GetState() == Coroutine::FINISH) {
                LOG << "Idle coroutine finish\n";
                break;
            }
            idle_threads_++;
            idle_co->Resume(); // Idle最后Yeild时回到这里
            idle_threads_--;
        }
    }
    LOG << "Scheduler Run() exit\n";
}

这个 Scheduler 是一个很简单的调度器,要对任务做更好的调度,少不了 Idle 协程的帮助。Idle 协程的具体实现要在之后的 IOManager 中,其继承自 Scheduler,重写了 Tickle()Idle() 等函数,并且使用 epoll 来实现在不同的 I/O 事件发生时,触发相应的处理逻辑。这使得程序可以以非阻塞的方式处理多个 I/O 操作,而不必等待每个操作完成后再进行下一个操作。

Licensed under CC BY-NC-SA 4.0
最后更新于 2026年3月18日星期三
使用 Hugo 构建
主题 StackJimmy 设计