我们常常需要在一个异步任务执行完毕时,取消与其有关的另外一些异步任务。例如,我们可能希望一个IO请求超时后能及时终止。async-simple基于协作式的信号-槽模型,提供了一套通用,线程安全且高效的异步任务取消机制,并支持结构化的任务取消。
Signal 与 Slot
async-simple提供了如下信号类型,每一个bit代表一种信号:
enum class SignalType : uint64_t {
none = 0,
// 低32bit的信号只能触发一次。
terminal = 0b1, // 最低的1bit代表终止信号。
// 2-16bit为async-simple保留位。
// 17-32bit给用户自定义扩展。
// 高32bit的信号可以多次触发(多次调用信号处理函数)。
// 33-48bit为async-simple保留位。
// 49-64bit给用户自定义扩展。
all = 0xffff'ffff'ffff'ffff, // 默认过滤级别(不过滤任何信号)
};
Signal
类型用于发起一个信号,而Slot
则用于接收信号。我们可以通过工厂方法创建一个信号,将多个Slot
绑定到同一个Signal
上。当Signal
触发信号时,所有Slot
都会收到对应的信号。
最简单的办法是手动查询信号是否已触发:
// 通过工厂方法创建Signal
std::shared_ptr<Signal> signal = Signal::create();
std::vector<std::future<void>> works;
for (int i=0;i<10;++i) {
// 为每一个异步任务创建Slot
auto slot = std::make_unique<Slot>(signal.get());
// 异步执行任务
std::async(std::launch::async, [slot=std::move(slot)] {
// 手动轮询取消状态
while (!slot->hasTriggered(SignalType::terminate)) {
// ...
}
return;
});
}
// ...
// 提交取消信号
signal->emit(SignalType::terminate);
for (auto &e:works)
e.get();
除了直接查询取消状态,我们可以在Slot
中注册信号处理函数来接受信号。信号处理函数的签名应为void(SignalType, Signal*)
。第一个参数SignalType代表经过滤后,本次成功触发的信号类型,第二个参数是指向信号的指针。
需要注意的是:
- 信号处理函数不应该阻塞。调用
emit()
函数并触发信号时,程序会遍历绑定在Signal
上的信号处理函数并立即执行。 - 注意线程安全问题:信号处理函数会由调用
emit()
的线程执行,32号以上的信号处理函数还可能由多个线程并发执行。 - 信号处理函数禁止持有槽绑定的信号,这会导致信号的内存泄漏。用户应通过
Signal*
参数访问信号。
例如,下面这段代码通过信号回调函数取消睡眠。
// 通过工厂方法创建Signal
std::shared_ptr<Signal> signal = Signal::create();
std::vector<std::future<void>> works;
for (int i=0;i<10;++i) {
// 为每一个异步任务创建槽
auto slot = std::make_unique<Slot>(signal.get());
// 异步执行任务
std::async(std::launch::async, [slot=std::move(slot)] {
std::unique_ptr<std::promise<void>> p;
auto f = p->get_future();
book ok = slot->emplace(SignalType::terminate, // 代表会触发该回调函数的bit位,该enum只能有一个bit位为1,如果提交的信号该位为1,则会触发回调处理函数。
[p=std::move(p)](SignalType, Signal*){
p->set_value();
});
if (ok) { //如果取消信号还未触发
f.wait_for(1s*(rand()+1)); // 除非取消信号被触发,否则睡觉睡眠一段时间。
}
slot->clear(); // 清除回调函数
if (slot->signal()) { //如果槽被绑定在信号上
slot->signal()->emit(SignalType::terminate); // 触发取消信号。
}
return;
});
}
// ...
// 第一个异步定时任务结束后其他任务也会一起结束。
for (auto &e:works)
e.get();
上述代码中,我们通过调用Slot
的signal()
函数可以获取到指向Signal
的指针。我们保证该函数返回的指针总是合法的,因为Signal
的生命周期会被自动延长到最后一个绑定的槽析构之时。
上述代码可能会多次调用emit
函数尝试触发取消信号,emit
调用会返回本次成功触发的信号。
class Signal
: public std::enable_shared_from_this<Signal> {
public:
// 提交信号(允许一次提交多种信号),并返回本次请求成功触发的信号,线程安全。
SignalType emit(SignalType state) noexcept;
// 获取当前的信号,线程安全。
SignalType state() const noexcept;
// 创建信号的工厂方法,返回信号的shared_ptr,线程安全。
static std::shared_ptr<Signal> create();
};
Signal
的所有操作都是线程安全的。然而,由于各异步任务应持有各自的Slot
,因此槽对象不是线程安全的。我们禁止用户并发调用同一个Slot
的公共接口。
class Slot {
// 将`Signal`与`Slot`绑定在一起。可以指定信号过滤级别,如果提交的信号type和filter做与运算的结果为0,则信号不会被触发。
Slot(Signal* signal,
SignalType filter = SignalType::all);
// 注册信号处理函数。用户可以多次注册信号处理函数,第二次注册会覆盖之前的信号处理函数。
// SingalType: 代表该信号处理函数会响应的bit位。该enum只能有一个bit位为1,如果提交的信号该位为1,则会触发回调处理函数。
// 返回false: 取消信号已经触发。
template <typename... Args>
[[nodiscard]] bool emplace(SignalType type, Args&&... args);
// 清空信号处理函数,如果返回false说明信号处理函数已经被执行,或尚未注册过信号。
bool clear();
// 在指定作用域内过滤信号,如果取消信号type & filter为0,则信号type在该作用域内不会被触发。
// 允许嵌套的添加filter。
[[nodiscard]] FilterGuard setScopedFilter(SignalType filter);
// 获取当前作用域的filter
SignalType getFilter();
// 判断信号是否处于触发状态(如果被过滤会返回false)。
bool hasTriggered(SignalType) const noexcept;
// 该函数返回`Slot`对应的`Signal`,如果在构造槽之前信号已经触发,则该函数返回nullptr。否则始终返回一个有效的`Signal`指针。这是因为slot持有对应的signal的所有权。如果想要延长signal的生命周期,可以调用signal()->shared_from_this(),也可以用signal启动一个新协程。
Signal* signal() const noexcept;
};
我们可以将多个Signal
串在一起。当某个Signal
被触发时,该信号会被转发给串联的其他子Signal
。
std::shared_ptr<Signal> signal = Signal::create();
auto slot = std::make_unique<Slot>(signal.get());
std::shared_ptr<Signal> chainedSignal = Signal::create();
slot->addChainedSignal(chainedSignal);
signal->emit(SignalType::terminate);
assert(chainedSignal->state()==SignalType::terminate);
// 信号会被转发给chainedSignal
// 然而,chainedSignal触发的信号不会触发给signal
chainedSignal->emit(static_cast<SignalType>(0b10));
assert(signal->state()!=static_cast<SignalType>(0b10));
我们还可以通过继承Signal
来为信号处理函数提供更多上下文信息:
class MySignal : public Signal {
using Signal::Signal;
public:
std::atomic<size_t> myState;
};
auto mySignal = Signal::create<MySignal>();
auto slot = std::make_unique<Slot>(mySignal->get());
slot->emplace([](SignalType type, Signal* signal) {
auto mySignal = dynamic_cast<MySignal*>(signal);
std::cout << "myState:" << mySignal->myState << std::endl;
});
mySignal->myState=1;
mySignal->emit(SignalType::terminate);
无栈协程支持
上述的槽与信号是较为低级的通用api,可适配各种异步场景。在async-simple无栈协程库Lazy
中,提供了一系列高级封装支持,使得用户无需关心细节,也能实现结构化并发任务的取消操作。
collect函数与结构化并发
在一般用户代码中,我们建议使用collectAny
和collectAll
来实现结构化并发任务的取消操作。
collectAny
collectAny
可以并发执行多个协程,并等待第一个协程返回。collectAny
会自动将这些协程绑定到一个取消信号,并在第一个协程执行完毕时,向其他尚未执行完毕的协程发送取消信号,从而结束这些任务。
例如,下面是使用collectAny
实现通用的超时处理逻辑的代码。如果async_read()在1s内没有返回,则sleep_1s()会先返回并取消async_read()。
Lazy<void> sleep_1s();
Lazy<std::string> async_read();
auto res = co_await collectAny<SignalType::terminate>(async_read(),sleep_1s());
if (std::get<res>() == 1) { // timed out!
// ...
}
else {
// ...
}
collectAny
支持用户发送不同的取消信号。默认发送的取消信号是SignalType::none
, 这会导致async_read()
任务未被取消并继续执行。如果想让collectAny
结束时取消其他任务,则可以选择发送信号SignalType::terminate
。
Lazy<void> work1();
Lazy<void> work2();
auto res = co_await collectAny<SignalType::none>(async_read(),sleep_1s());
if (std::get<res>() == 0) {
// work1 finished, work2 will still working
}
else {
// work2 finished, work1 will still working
}
collectAll
和collectAny
相同,collectAll
也可以在第一个任务结束时发送一次信号。(默认情况下不发送信号)
Lazy<int> work1();
Lazy<std::string> work2();
// work1(), work2() all finished, no cancel
auto res = co_await collectAll(work1(),work2());
// work1(), work2() all finished, the later work will be canceled by SignalType::terminate
auto res = co_await collectAll<SignalType::terminate>(work1(),work2());
和collectAny不同,collectAll会等待协程执行完毕,并获取返回值。一方面,这简化了异步任务的生命周期。另一方面,如果取消信号没能中止任务,collectAll
会等待其他任务执行完毕。
http_client client;
// 和collectAny不同, 我们无需通过引用计数延长client的生命周期,因为collectAll保证一定会等待http_client::connect协程返回。
auto res = co_await collectAll<SignalType::terminate>(client.connect("localhost:8080"), sleep(1s));
//如果不发送取消信号,collectAll会一直执行,直到时间超过一秒且客户端完成连接
auto res = co_await collectAll(client.connect("localhost:8080"), sleep(1s));
信号槽的传递与获取
一个Lazy
可以持有一个Slot
, 当我们想要为一个异步任务绑定取消信号时,只需要在任务开始阶段,通过Lazy<T>::setLazyLocal
来绑定信号,它会随着co_await
一路传递下去。需要注意的是,信号只能绑定一次。
我们可以通过co_await CurrentSlot{}
来获取指向Slot
的指针,调用co_await ForbidSignal{}
会让协程调用链与取消信号解绑,从而防止后续任务被中止。它会析构对应的Slot
对象,并且之后调用co_await CurrentSlot{}
会返回nullptr。
Lazy<void> subTask() {
Slot* slot = co_await CurrentSlot{};
assert(slot!=nullptr);
co_await ForbidSignal{};
// slot is illegal now.
assert(co_await CurrentSlot{} == nullptr);
co_returnl
}
Lazy<void> Task() {
Slot* slot = co_await CurrentSlot{};
assert(slot!=nullptr);
co_await subTask();
assert(co_await CurrentSlot{} == nullptr);
co_return;
}
auto signal = Signal::create();
syncAwait(Task().setLazyLocal(signal.get()).via(ex));
支持取消操作与信号转发的对象与函数
除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都为取消操作提供了支持。此外,collect*
函数支持将外部收到的信号转发给由collect*
函数启动的协程。
以下各对象与函数支持取消操作,响应信号的协程可能抛出异常async_simple::SignalException
,调用value()
应返回信号类型(对于取消操作为async_simple::terminate
)。 此外,下列IO函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。
- CollectAny:CollectAny会将信号转发给所有子任务,如果收到取消信号,会抛出异常立即返回。
- CollectAll:CollectAny会将信号转发给所有子任务,即使收到取消信号,自身依然会等待所有子任务执行完毕后正常返回。
- Yield/SpinLock:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
- Sleep: 依赖于调度器是否重写了虚函数
void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)
,并正确实现了取消功能。如果未重写该函数,默认实现支持取消Sleep。
以下IO对象与函数暂未支持取消操作,有待后续完善。
- Mutex
- ConditionVariable
- SharedMutex
- Latch
- Promise/Future
- CountingSemaphore
自定义Awaiter如何支持取消
用户在实现自己的IO函数时,也需要适配支持取消功能。async_simple提供了signalHelper{terminate}.hasCanceled()
(用于协程的await_ready()
),signalHelper{terminate}.tryEmplace()
(用于协程的await_suspend()
)和signalHelper{terminate}.checkHasCanceled()
(用于协程的await_resume()
)来简化用户代码。
下面我们提供一个基于异步定时器实现的协程Sleep实现样例,该函数支持通过取消信号来中断Sleep:
using Duration = std::chrono::millseconds;
class TimeAwaiter {
public:
TimeAwaiter(Duration dur, Slot *slot)
: _asyncTimer(...), _dur(dur), _slot(slot) {}
public:
bool await_ready() const noexcept {
// check if canceled before suspend (if canceled, coroutine will call await_resume() immediately)
return signalHelper{terminate}.hasCanceled(_slot);
}
void await_suspend(std::coroutine_handle<> handle) {
_asyncTimer.sleep_for(_dur, [](auto&&){
handle.resume();
})
bool hasnt_canceled = signalHelper{terminate}.tryEmplace(
slot, [this](SignalType, Signal*) {
_asyncTimer.cancel();
});
if (!hasnt_canceled) { // has canceled
_asyncTimer.cancel();
}
}
// check if canceled after suspend (if canceled, throw SignalException), if not canceled, clear slot function.
void await_resume() {
signalHelper{terminate}.checkHasCanceled(_slot);
}
// helper function to speed-up Lazy co_await (it will ignore lazy's executor)
auto coAwait(Executor *) {
return *this;
}
private:
AsyncTimer _asyncTimer;
Duration _dur;
Slot *_slot;
};
// throw SignalException if canceled.
template <typename Rep, typename Period>
Lazy<void> my_sleep(Duration dur) {
co_return co_await TimeAwaiter{dur,co_await CurrentSlot{}};
}
有栈协程支持
有栈协程暂未对取消做特殊支持,待后续完善。