Skip to content

我们常常需要在一个异步任务执行完毕时,取消与其有关的另外一些异步任务。例如,我们可能希望一个IO请求超时后能及时终止。async-simple基于协作式的信号-槽模型,提供了一套通用,线程安全且高效的异步任务取消机制,并支持结构化的任务取消。

Signal 与 Slot

async-simple提供了如下信号类型,每一个bit代表一种信号:

cpp
enum class SignalType : uint64_t {
    none = 0,
    // 低32bit的信号只能触发一次。
    terminal = 0b1,                   // 最低的1bit代表终止信号。
                                      // 2-16bit为async-simple保留位。
                                      // 17-32bit给用户自定义扩展。
    // 高32bit的信号可以多次触发(多次调用信号处理函数)。
                                      // 33-48bit为async-simple保留位。
                                      // 49-64bit给用户自定义扩展。
    all = 0xffff'ffff'ffff'ffff,      // 默认过滤级别(不过滤任何信号)
};

Signal类型用于发起一个信号,而Slot则用于接收信号。我们可以通过工厂方法创建一个信号,将多个Slot绑定到同一个Signal上。当Signal触发信号时,所有Slot都会收到对应的信号。

最简单的办法是手动查询信号是否已触发:

cpp
// 通过工厂方法创建Signal
std::shared_ptr<Signal> signal = Signal::create();
std::vector<std::future<void>> works;
for (int i=0;i<10;++i) {
  // 为每一个异步任务创建Slot
  auto slot = std::make_unique<Slot>(signal.get());
  // 异步执行任务
  std::async(std::launch::async, [slot=std::move(slot)] {
      // 手动轮询取消状态
      while (!slot->hasTriggered(SignalType::terminate)) {
        // ...
      }
      return;
  });
}
// ...
// 提交取消信号
signal->emit(SignalType::terminate);
for (auto &e:works)
  e.get();

除了直接查询取消状态,我们可以在Slot中注册信号处理函数来接受信号。信号处理函数的签名应为void(SignalType, Signal*)。第一个参数SignalType代表经过滤后,本次成功触发的信号类型,第二个参数是指向信号的指针。

需要注意的是:

  1. 信号处理函数不应该阻塞。调用emit()函数并触发信号时,程序会遍历绑定在Signal上的信号处理函数并立即执行。
  2. 注意线程安全问题:信号处理函数会由调用emit()的线程执行,32号以上的信号处理函数还可能由多个线程并发执行。
  3. 信号处理函数禁止持有槽绑定的信号,这会导致信号的内存泄漏。用户应通过Signal*参数访问信号。

例如,下面这段代码通过信号回调函数取消睡眠。

cpp
// 通过工厂方法创建Signal
std::shared_ptr<Signal> signal = Signal::create();
std::vector<std::future<void>> works;
for (int i=0;i<10;++i) {
  // 为每一个异步任务创建槽
  auto slot = std::make_unique<Slot>(signal.get());
  // 异步执行任务
  std::async(std::launch::async, [slot=std::move(slot)] {
      std::unique_ptr<std::promise<void>> p;
      auto f = p->get_future();
      book ok = slot->emplace(SignalType::terminate, // 代表会触发该回调函数的bit位,该enum只能有一个bit位为1,如果提交的信号该位为1,则会触发回调处理函数。
          [p=std::move(p)](SignalType, Signal*){
          p->set_value();
      });
      if (ok) { //如果取消信号还未触发
          f.wait_for(1s*(rand()+1)); // 除非取消信号被触发,否则睡觉睡眠一段时间。
      }
      slot->clear(); // 清除回调函数
      if (slot->signal()) { //如果槽被绑定在信号上
          slot->signal()->emit(SignalType::terminate); // 触发取消信号。
      }
      return;
  });
}
// ...
// 第一个异步定时任务结束后其他任务也会一起结束。
for (auto &e:works)
  e.get();

上述代码中,我们通过调用Slotsignal()函数可以获取到指向Signal的指针。我们保证该函数返回的指针总是合法的,因为Signal的生命周期会被自动延长到最后一个绑定的槽析构之时。

上述代码可能会多次调用emit函数尝试触发取消信号,emit调用会返回本次成功触发的信号。

cpp
class Signal
    : public std::enable_shared_from_this<Signal> {
public:
    // 提交信号(允许一次提交多种信号),并返回本次请求成功触发的信号,线程安全。
    SignalType emit(SignalType state) noexcept;
    // 获取当前的信号,线程安全。
    SignalType state() const noexcept;
    // 创建信号的工厂方法,返回信号的shared_ptr,线程安全。
    static std::shared_ptr<Signal> create();
};

Signal的所有操作都是线程安全的。然而,由于各异步任务应持有各自的Slot,因此槽对象不是线程安全的。我们禁止用户并发调用同一个Slot的公共接口。

cpp
class Slot {
    // 将`Signal`与`Slot`绑定在一起。可以指定信号过滤级别,如果提交的信号type和filter做与运算的结果为0,则信号不会被触发。
    Slot(Signal* signal,
                     SignalType filter = SignalType::all);
    // 注册信号处理函数。用户可以多次注册信号处理函数,第二次注册会覆盖之前的信号处理函数。
    // SingalType: 代表该信号处理函数会响应的bit位。该enum只能有一个bit位为1,如果提交的信号该位为1,则会触发回调处理函数。
    // 返回false: 取消信号已经触发。
    template <typename... Args>
    [[nodiscard]] bool emplace(SignalType type, Args&&... args);
    // 清空信号处理函数,如果返回false说明信号处理函数已经被执行,或尚未注册过信号。
    bool clear();
    // 在指定作用域内过滤信号,如果取消信号type & filter为0,则信号type在该作用域内不会被触发。
    // 允许嵌套的添加filter。
    [[nodiscard]] FilterGuard setScopedFilter(SignalType filter);
    // 获取当前作用域的filter
    SignalType getFilter();
    // 判断信号是否处于触发状态(如果被过滤会返回false)。
    bool hasTriggered(SignalType) const noexcept;
    // 该函数返回`Slot`对应的`Signal`,如果在构造槽之前信号已经触发,则该函数返回nullptr。否则始终返回一个有效的`Signal`指针。这是因为slot持有对应的signal的所有权。如果想要延长signal的生命周期,可以调用signal()->shared_from_this(),也可以用signal启动一个新协程。
    Signal* signal() const noexcept;
};

我们可以将多个Signal串在一起。当某个Signal被触发时,该信号会被转发给串联的其他子Signal

cpp
std::shared_ptr<Signal> signal = Signal::create();
auto slot = std::make_unique<Slot>(signal.get());
std::shared_ptr<Signal> chainedSignal = Signal::create();
slot->addChainedSignal(chainedSignal);
signal->emit(SignalType::terminate);
assert(chainedSignal->state()==SignalType::terminate);
// 信号会被转发给chainedSignal
// 然而,chainedSignal触发的信号不会触发给signal
chainedSignal->emit(static_cast<SignalType>(0b10));
assert(signal->state()!=static_cast<SignalType>(0b10));

我们还可以通过继承Signal来为信号处理函数提供更多上下文信息:

cpp
class MySignal : public Signal {
    using Signal::Signal;
  public:
    std::atomic<size_t> myState;  
};

auto mySignal = Signal::create<MySignal>();
auto slot = std::make_unique<Slot>(mySignal->get());
slot->emplace([](SignalType type, Signal* signal) {
  auto mySignal = dynamic_cast<MySignal*>(signal);
  std::cout << "myState:" << mySignal->myState << std::endl;
});
mySignal->myState=1;
mySignal->emit(SignalType::terminate);

无栈协程支持

上述的槽与信号是较为低级的通用api,可适配各种异步场景。在async-simple无栈协程库Lazy中,提供了一系列高级封装支持,使得用户无需关心细节,也能实现结构化并发任务的取消操作。

collect函数与结构化并发

在一般用户代码中,我们建议使用collectAnycollectAll来实现结构化并发任务的取消操作。

collectAny

collectAny可以并发执行多个协程,并等待第一个协程返回。collectAny会自动将这些协程绑定到一个取消信号,并在第一个协程执行完毕时,向其他尚未执行完毕的协程发送取消信号,从而结束这些任务。

例如,下面是使用collectAny实现通用的超时处理逻辑的代码。如果async_read()在1s内没有返回,则sleep_1s()会先返回并取消async_read()。

cpp
Lazy<void> sleep_1s();
Lazy<std::string> async_read();
auto res = co_await collectAny<SignalType::terminate>(async_read(),sleep_1s());
if (std::get<res>() == 1) { // timed out!
  // ...
}
else {
  // ...
}

collectAny支持用户发送不同的取消信号。默认发送的取消信号是SignalType::none, 这会导致async_read()任务未被取消并继续执行。如果想让collectAny结束时取消其他任务,则可以选择发送信号SignalType::terminate

cpp
Lazy<void> work1();
Lazy<void> work2();
auto res = co_await collectAny<SignalType::none>(async_read(),sleep_1s());
if (std::get<res>() == 0) { 
  // work1 finished, work2 will still working
}
else { 
  // work2 finished, work1 will still working
}

collectAll

collectAny相同,collectAll也可以在第一个任务结束时发送一次信号。(默认情况下不发送信号)

cpp
Lazy<int> work1();
Lazy<std::string> work2();
// work1(), work2() all finished, no cancel
auto res = co_await collectAll(work1(),work2());
// work1(), work2() all finished, the later work will be canceled by SignalType::terminate
auto res = co_await collectAll<SignalType::terminate>(work1(),work2());

和collectAny不同,collectAll会等待协程执行完毕,并获取返回值。一方面,这简化了异步任务的生命周期。另一方面,如果取消信号没能中止任务,collectAll会等待其他任务执行完毕。

cpp
http_client client;
// 和collectAny不同, 我们无需通过引用计数延长client的生命周期,因为collectAll保证一定会等待http_client::connect协程返回。
auto res = co_await collectAll<SignalType::terminate>(client.connect("localhost:8080"), sleep(1s));
//如果不发送取消信号,collectAll会一直执行,直到时间超过一秒且客户端完成连接
auto res = co_await collectAll(client.connect("localhost:8080"), sleep(1s));

信号槽的传递与获取

一个Lazy可以持有一个Slot, 当我们想要为一个异步任务绑定取消信号时,只需要在任务开始阶段,通过Lazy<T>::setLazyLocal来绑定信号,它会随着co_await一路传递下去。需要注意的是,信号只能绑定一次。

我们可以通过co_await CurrentSlot{} 来获取指向Slot的指针,调用co_await ForbidSignal{}会让协程调用链与取消信号解绑,从而防止后续任务被中止。它会析构对应的Slot对象,并且之后调用co_await CurrentSlot{}会返回nullptr。

cpp
Lazy<void> subTask() {
    Slot* slot = co_await CurrentSlot{};
    assert(slot!=nullptr);
    co_await ForbidSignal{};
    // slot is illegal now.
    assert(co_await CurrentSlot{} == nullptr);
    co_returnl
}
Lazy<void> Task() {
    Slot* slot = co_await CurrentSlot{};
    assert(slot!=nullptr);
    co_await subTask();
    assert(co_await CurrentSlot{} == nullptr);
    co_return;
}
auto signal = Signal::create();
syncAwait(Task().setLazyLocal(signal.get()).via(ex));

支持取消操作与信号转发的对象与函数

除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都为取消操作提供了支持。此外,collect*函数支持将外部收到的信号转发给由collect*函数启动的协程。

以下各对象与函数支持取消操作,响应信号的协程可能抛出异常async_simple::SignalException,调用value()应返回信号类型(对于取消操作为async_simple::terminate)。 此外,下列IO函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。

  1. CollectAny:CollectAny会将信号转发给所有子任务,如果收到取消信号,会抛出异常立即返回。
  2. CollectAll:CollectAny会将信号转发给所有子任务,即使收到取消信号,自身依然会等待所有子任务执行完毕后正常返回。
  3. Yield/SpinLock:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
  4. Sleep: 依赖于调度器是否重写了虚函数void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr),并正确实现了取消功能。如果未重写该函数,默认实现支持取消Sleep。

以下IO对象与函数暂未支持取消操作,有待后续完善。

  1. Mutex
  2. ConditionVariable
  3. SharedMutex
  4. Latch
  5. Promise/Future
  6. CountingSemaphore

自定义Awaiter如何支持取消

用户在实现自己的IO函数时,也需要适配支持取消功能。async_simple提供了signalHelper{terminate}.hasCanceled()(用于协程的await_ready()),signalHelper{terminate}.tryEmplace()(用于协程的await_suspend())和signalHelper{terminate}.checkHasCanceled()(用于协程的await_resume())来简化用户代码。

下面我们提供一个基于异步定时器实现的协程Sleep实现样例,该函数支持通过取消信号来中断Sleep:

cpp
using Duration = std::chrono::millseconds;
class TimeAwaiter {
public:
    TimeAwaiter(Duration dur, Slot *slot)
        : _asyncTimer(...), _dur(dur), _slot(slot) {}

public:
    bool await_ready() const noexcept { 
      // check if canceled before suspend (if canceled, coroutine will call await_resume() immediately)
      return signalHelper{terminate}.hasCanceled(_slot); 
    }

    void await_suspend(std::coroutine_handle<> handle) {
        _asyncTimer.sleep_for(_dur, [](auto&&){
          handle.resume();
        })
        bool hasnt_canceled = signalHelper{terminate}.tryEmplace(
            slot, [this](SignalType, Signal*) {
                _asyncTimer.cancel();
            });
        if (!hasnt_canceled) { // has canceled
          _asyncTimer.cancel();
        }
    }

    // check if canceled after suspend (if canceled, throw SignalException), if not canceled, clear slot function.
    void await_resume() { 
      signalHelper{terminate}.checkHasCanceled(_slot); 
    }

    // helper function to speed-up Lazy co_await (it will ignore lazy's executor)
    auto coAwait(Executor *) {
        return *this;
    }

private:
    AsyncTimer _asyncTimer;
    Duration _dur;
    Slot *_slot;
};

// throw SignalException if canceled.
template <typename Rep, typename Period>
Lazy<void> my_sleep(Duration dur) {
    co_return co_await TimeAwaiter{dur,co_await CurrentSlot{}};
}

有栈协程支持

有栈协程暂未对取消做特殊支持,待后续完善。

This website is released under the MIT License.