Skip to content
On this page

coro_rpc客户端介绍

基本使用

coro_rpc::coro_rpc_client是coro_rpc的客户端,用户可以通过它向服务器发送rpc请求。

下面我们展示rpc_client的基本使用方法。

cpp
using namespace async_simple;
using namespace coro_rpc;
int add(int a,int b);
Lazy<void> example() {
  coro_rpc_client client;
  coro_rpc::err_code ec = co_await client.connect("localhost:9001");
  if (ec) { /*判断连接是否出错*/
    std::cout<<ec.message()<<std::endl;
    co_return;
  }
  rpc_result result = co_await client.call<add>(1,2);
  /*rpc_result是一个expected<T,rpc_error>类型,其中T为rpc返回值*/
  if (!result.has_value()) {
    /*调用result.error()获取rpc错误信息*/
    std::cout<<"error code:"<<result.error().val()<< ", error message:"<<result.error().msg()<<std::endl;
    co_return;
  }
  assert(result.value()==3); /*调用result.value()获取rpc返回值*/
}

我们可以通过set_req_attachment函数设置本次请求的attachment,这是一段不会经过序列化直接发送给客户端的二进制数据。同样的,我们也可以使用get_resp_attachment()release_resp_attachment()来获取rpc请求返回的attachment。

cpp
using namespace async_simple;
using namespace coro_rpc;
void attachment_echo() {
  auto ctx=coro_rpc::get_context();
  ctx->set_resp_attachment(ctx->get_req_attachment());
}
Lazy<std::string> example(coro_rpc_client& client, std::string_view attachment) {
  client.set_req_attachment(attachment);
  rpc_result result = co_await client.call<attachment_echo>();
  if (result.has_value()) {
    assert(result.get_resp_attachment()==attachment);
    co_return std::move(result.release_resp_attachment());
  }
  co_return "";
}

默认情况下,rpc客户端发送请求/建立连接后会等待5秒,如果5秒后仍未收到响应,则会返回超时错误。 用户也可以通过调用call_for函数自定义等待的时长。

cpp
client.connect("127.0.0.1:9001", std::chrono::seconds{10});
auto result = co_await client.call_for<add>(std::chrono::seconds{10},1,2);
assert(result.value() == 3);

时长可以是任一的std::chrono::duration类型,常见的如std::chrono::seconds,std::chrono::millseconds。 特别的,如果时长为0,代表该函数调用永远也不会超时。

SSL支持

coro_rpc支持使用openssl对连接进行加密。在安装openssl并使用cmake find_package/fetch_content 将yalantinglibs导入到你的工程后,可以打开cmake选项YLT_ENABLE_SSL=ON启用ssl支持。或者,你也可以手动添加宏YLT_ENABLE_SSL并手动链接openssl。

当启用ssl支持后,用户可以调用init_ssl函数,然后再连接到服务器。这会使得客户端与服务器之间建立加密的链接。需要注意的是,coro_rpc服务端在编译时也必须启用ssl支持,并且在启动服务器之前也需要调用init_ssl方法来启用SSL支持。

cpp
client.init_ssl("./","server.crt");

第一个字符串代表SSL证书所在的基本路径,第二个字符串代表SSL证书相对于基本路径的相对路径。

当建立连接时,客户端会使用该证书校验服务端发来的证书,以避免中间人攻击。因此,客户端必须持有服务端使用的证书或其根证书。

RPC参数的转换与编译期检查

coro_rpc会在调用的时候对参数的合法性做编译期检查,比如,对于如下rpc函数:

cpp
inline std::string echo(std::string str) { return str; }

接下来,当前client调用rpc函数时:

cpp
client.call<echo>(42);// The argument does not match, a compilation error occurs.
client.call<echo>();// Missing argument, compilation error occurs.
client.call<echo>("", 0);// There are too many arguments, a compilation error occurs.
client.call<echo>("hello, coro_rpc");// The string literal can be converted to std::string, compilation succeeds.

连接选项

coro_rpc_client提供了init_config函数,用于配置连接选项。下面这份代码列出了可配置的选项。这些选项默认均可以不填写。

cpp
using namespace coro_rpc;
using namespace std::chrono;
void set_config(coro_rpc_client& client) {
  uint64_t client_id;
  std::chrono::milliseconds connect_timeout_duration;
  std::chrono::milliseconds request_timeout_duration;
  std::string host;
  std::string port;
  std::string local_ip;
  client.init_config(config{
    .connect_timeout_duration = 5s, // 连接的超时时间
    .request_timeout_duration = 5s, // 请求超时时间
    .host = "localhost", // 服务器域名
    .port = "9001", // 服务器端口
    .local_ip = "", // 本地ip,用于指定本地通信的ip地址。
    .socket_config=std::variant<tcp_config,
                 tcp_with_ssl_config,
                 coro_io::ibverbs_config>{tcp_config{}}; // 指定底层的协议及其底层配置,目前支持tcp, ssl over tcp, rdma三种协议。
  });
}

rdma配置

ibverbs协议的配置如下:

cpp
struct ibverbs_config {
  uint32_t cq_size = 128; // 事件通知队列的最大长度
  uint32_t recv_buffer_cnt = 4;                 // 默认提交到接受队列的缓冲数目,一个缓冲区默认2m,因此一个rdma在连接成功后即占用8MB的内存。积压的接收数据越多,队列中的缓冲区也会越多,最多可以缓冲max_recv_wr*buffer_size这么多的数据(buffer_size为buffer_pool配置的缓冲区大小),此后如果上层仍不消费数据,则发送端会收到rnr错误,不断重试并等待对端消费。
  ibv_qp_type qp_type = IBV_QPT_RC;             // 默认的qp类型。
  ibv_qp_cap cap = {.max_send_wr = 1,           // 发送队列的最大长度。
                    .max_recv_wr = 32,          // 接受队列的最大长度
                    .max_send_sge = 3,          // 发送的最大地址分段数,在不启用inline data时只需要1个。使用inline data时数据可能不经过拷贝直接从原始分段地址发送,因此设置为3段(默认支持3段分散地址)。
                    .max_recv_sge = 1,          // 接受的最大地址分段数,目前的缓冲区配置下只需要1个即可。
                    .max_inline_data = 256};    // 如果发送的数据包小于inline data,且底层网卡支持该设置,则小数据包不会被拷贝到缓冲中,而是直接交给网卡发送。
  std::shared_ptr<coro_io::ib_device_t> device; // rpc使用的底层ib网卡。默认选择设备列表第一个网卡。
  std::shared_ptr<coro_io::ib_buffer_pool_t> buffer_pool; // socket使用的缓冲池,默认使用全局缓冲池。
};

可以通过下面的代码简单的启用rdma:

cpp
coro_rpc_client cli;
  cli.init_ibv(); //使用默认配置
  cli.init_ibv(ibverbs_config{}); //使用用户指定的配置

也可以在配置中启用rdma:

cpp
coro_rpc_client cli;
  cli.init_config(config{.socket_config=ibverbs_config{}})

rdma buffer池

用户可以手动选择buffer_pool:

cpp
coro_io::g_ib_buffer_pool({.buffer_size=1024}); // 修改默认的全局buffer_pool的配置
  // 手动创建连接池
  auto pool = coro_io::ib_buffer_pool_t::create(coro_io::g_ib_device(),{.buffer_size = 3* 1024 * 1024,/* 缓冲大小*/
                            .max_memory_usage = 20 * 1024 * 1024,/*最大内存占用,超过后内存分配会失败,并导致连接关闭*/
                            .idle_timeout = 5s /* 长时间未使用的buffer会被回收*/});
  coro_rpc_client cli;
  cli.init_ibv(coro_io::ibverbs_config{
    .buffer_pool=pool;
  };);

rdma设备

用户可以手动选择rdma设备:

cpp
coro_io::g_ib_device({.dev_name= "my_dev" }); // 指定默认的rdma设备
  // 手动指定rdma设备
  auto dev = std::make_shared<coro_io::ib_device_t>(coro_io::ib_config_t{.dev_name="my_dev"});
  coro_rpc_client cli;
  coro_rpc
  cli.init_ibv(coro_io::ibverbs_config{
    .device = dev
  };);

调用模型

每一个coro_rpc_client都会绑定到某个IO线程上,默认通过轮转法从全局IO线程池中选择一个连接,用户也可以手动绑定到特定的IO线程上。

cpp
auto executor=coro_io::get_global_executor();
coro_rpc_client client(executor),client2(executor);
// 两个客户端都被绑定到同一个io线程上

每次发起一个基于协程的IO任务(如connect,call,send_request),客户端内部会将IO事件提交给操作系统,当IO事件完成后,再将协程恢复到绑定的IO线程上继续执行。

例如以下代码,调用connect之后任务将切换到IO线程执行。

cpp
/*run in thread 1*/
coro_rpc_client cli;
co_await cli.connect("localhost:9001");
/*run in thread 2*/
do_something();

连接池与负载均衡

coro_io提供了连接池client_pool与负载均衡器channel。用户可以通过连接池client_pool来管理coro_rpc/coro_http连接,可以使用channel实现多个host之间的负载均衡。具体请见coro_io的文档。

连接复用

coro_rpc_client 可以通过 send_request函数实现连接复用。该函数是线程安全的,允许多个线程同时调用同一个client的 send_request方法。该函数返回值为Lazy<Lazy<async_rpc_result<T>>>.

连接复用允许我们在高并发下减少连接的个数,无需创建新的连接。同时也能提高每个连接的吞吐量。

下面是一段简单的示例代码:

cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string_view echo(std::string_view);
Lazy<void> example(coro_rpc_client& client) {
  //向服务器发送请求
  Lazy<async_rpc_result<std::string_view>> handler = co_await client.send_request<echo>("Hello");
  async_rpc_result<std::string_view> result = co_await std::move(handler);
  if (result) {
    assert(result->result() == "Hello");
  }
  else {
    // error handle
    std::cout<<result.error().msg()<<std::endl;
  }
}

我们可以多次调用send_request实现连接复用:

cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string echo(std::string);
Lazy<void> example(coro_rpc_client& client) {
  std::vector<Lazy<async_rpc_result<std::string>>> handlers;
  // 准备发送10个请求
  for (int i=0;i<10;++i) {
    handlers.push_back(co_await client.send_request<echo>(std::to_string(i)));
  }
  //接下来等待所有的请求返回
  std::vector<async_rpc_result<std::string>> results = co_await collectAll(std::move(handlers));
  for (int i=0;i<10;++i) {
    assert(results[i]->result() == std::to_string(i));
  }
  co_return;
}

Attachment

使用send_request方法时,由于可能同时发送多个请求,因此我们不能调用set_req_attachment方法向服务器发送attachment,同样也不能调用get_resp_attachmentrelease_resp_attachment方法来获取服务器返回的attachment。

我们可以通过调用send_request_with_attachment函数,在发送请求时设置attachment。我们也可以通过调用async_rpc_result的->get_attachment()方法和->release_buffer()方法来获取attachment。

cpp
using namespace coro_rpc;
using namespace async_simple::coro;
int add(int a, int b);
Lazy<std::string> example(coro_rpc_client& client) {
  async_rpc_result<std::string_view> result = co_await co_await client.send_request_with_attachment<add>("Hello", 1, 2);
  assert(result->result() == 3);
  assert(result->get_attachment() == "Hello");
  co_return std::move(result->release_buffer().resp_attachment_buf_);
}

执行顺序

当调用的rpc函数是协程rpc函数或回调rpc函数时,rpc请求不一定会按顺序执行,服务端可能会同时执行多个rpc请求。

例如,假如有以下代码:

cpp
using namespace async_simple::coro;
Lazy<void> sleep(int seconds) {
  co_await coro_io::sleep(1s * seconds);  // 在此处让出协程
  co_return;
}

服务器注册并启动:

cpp
using namespace coro_rpc;
void start() {
  coro_rpc_server server(/* thread = */1,/* port = */ 8801);
  server.register_handler<sleep>();
  server.start();
}

客户端连续在同一个连接上调用两次sleep函数,第一次sleep2秒,第二次sleep1秒。

cpp
using namespace async_simple::coro;
using namespace coro_rpc;
Lazy<void> call() {
  coro_rpc_client cli,cli2;
  co_await cli.connect("localhost:8801");
  co_await cli2.connect("localhost:8801");
  auto handler1 = co_await cli.send_request<sleep>(2);
  auto handler2 = co_await cli.send_request<sleep>(1);
  auto handler3 = co_await cli2.send_request<sleep>(0);
  handler2.start([](auto&&){
    std::cout<<"handler2 return"<<std::endl;
  });
  handler3.start([](auto&&){
   d::cout<<"handler3 return"<<std::endl;
  });
  co_await handler1;
  std::cout<<"handler1 return"<<std::endl;
}

正常情况下handler3会先返回,随后是handler2,最后是handler1。尽管服务器只有一个IO线程用于执行rpc函数,但协程函数会在调用coro_io::sleep时让出该协程,从而保证其他连接不会被阻塞。

socket延迟发送

当使用连接复用时,可以尝试将选项中的enable_tcp_no_delay设为false,这允许底层实现将多个小请求打包后一起发送,从而提高吞吐量,但是可能会导致延迟上升。

线程安全

对于多个coro_rpc_client实例,它们之间互不干扰,可以分别在不同的线程中安全的调用。

单个coro_rpc_client在多个线程同时调用时需要注意,只有部分成员函数是线程安全的,包括send_request(),close(),connect(),get_executor(),get_pipeline_size(),get_client_id(),get_config()等。如果用户没有重新调用connect()函数并传入endpoint或hostname,那么get_port(),get_host()函数也是线程安全的。

需要注意,call,get_resp_attachment,set_req_attachment,release_resp_attachmentinit_config函数均不是线程安全的,禁止多个线程同时调用。此时只能使用send_request实现多个线程并发请求同一个连接。

coro_rpc 性能测试

先编译coro_rpc 的bench再执行bench命令行,命令行详情:

options:
  -u, --url                   url (string [=0.0.0.0:9000])
  -s, --send_data_len         send data length (unsigned long [=8388608])
  -c, --client_concurrency    total number of http clients (unsigned int [=0])
  -m, --max_request_count     max request count (unsigned long [=100000000])
  -p, --port                  server port (unsigned short [=9000])
  -r, --resp_len              response data length (unsigned long [=13])
  -b, --buffer_size           buffer size (unsigned int [=2097152])
  -o, --log_level             Severity::INFO 1 as default, WARN is 4 (int [=1])
  -i, --enable_ib             enable ib (bool [=1])
  -d, --duration              duration seconds (unsigned int [=100000])
  -?, --help                  print this message

server侧命令行: ./bench -p 9004 -r 13 -o 5

含义:启动服务端,response的数据长度为13,日志级别为error(日志级别和easylog的日志级别对应)。

client测命令行: ./bench -u 0.0.0.0:9004 -c 100 -s 8388608 -o 5 -d 30

含义:启动100个client去压测,每次请求发送8MB数据,日志级别为error,持续时间30秒。

默认会启用ibverbs,如果希望只测试tcp则添加-i 0

性能测试完成之后将输出qps,吞吐和latency数据,测试结果类似于:

# Benchmark result
avg qps 2757 and throughput:185.04 Gb/s in duration 30
# HELP Latency(us) of rpc call help
# TYPE Latency(us) of rpc call summary
rpc call latency(us){quantile="0.500000"} 3616.000000
rpc call latency(us){quantile="0.900000"} 3712.000000
rpc call latency(us){quantile="0.950000"} 3776.000000
rpc call latency(us){quantile="0.990000"} 3872.000000
rpc call latency(us)_sum 298707968.000000
rpc call latency(us)_count 82761

This website is released under the MIT License.