在Rust中实现gRPC重试

问题

CITA-Cloud采用了微服务架构,微服务之间以及对应用暴露的接口都采用了gRPC

gRPC调用时可能会返回错误,需要对错误进行处理。经过讨论之后,我们采用的方案是将错误码分成两层。应用层面的错误用单独的status_code来表示;gRPC本身的错误会用响应中的Status来表示。

前段时间碰到了可能是网络抖动造成的客户端返回UNAVAILABLE的现象。因为这个是gRPC本身的错误,应用层没办法处理,只能靠客户端重试来解决。

gRPC重试

针对这个需求,可以使用gRPC本身就提供的拦截器功能。通过注入一个拦截器,检查每次调用的结果,如果返回错误(并不是每种错误都可以重试的,具体参见官方关于错误码的描述),则再次发起调用。

golang这样的亲儿子上,甚至已经有现成的库可以非常方便的做到这样的事情。

gRPC重试 in Rust

因为CITA-Cloud主要使用Rust,结果搜索了一圈,震惊的发现竟然没有现成的库。在Rust这样造轮子情绪高涨的社区里,这是一个很不寻常的情况。

所幸一番搜索之后,发现了相应的原因,还是跟Rust的所有权特性有关系。

因为要在失败后重试,就要复制一份调用的请求参数。这个在其他语言里面根本不是个事,但是在Rust里就麻烦了。

tonic(一个纯RustgRPC实现)中一个接口的客户端函数原型为:

1
2
3
4
pub async fn store(
&mut self,
request: impl tonic::IntoRequest<super::Content>,
) -> Result<tonic::Response<super::super::common::StatusCode>, tonic::Status>

请求的类型是tonic::IntoRequest<T>(其中的T为请求中的应用层数据结构),这个类型是没有实现Clone的。

至于为什么不实现,开发者的解释是要考虑到gRPCstream模式,stream中的请求是没法Clone的。

那非stream模式可以实现吗?答案也是不行,因为gRPC是基于Http2的,Http2总是stream的,因此单次调用模式其实就是只包含一个请求的stream

解决方案

请求的类型tonic::IntoRequest<T>无法Clone,但是里面的T通常都是可以Clone的。

因此在Rust中像golang一样通过拦截器来非常优雅的实现重试是做不到了,但是用复杂一点的方法还是可以实现的。

其实说白了就是在应用层,按照最直接的方式来实现重试。在应用层多封装一层函数,其参数是应用层的请求类型T。调用接口之后,判断结果,如果是可重试的错误,则将类型T复制一份,重新发起调用。

当然这样实现的问题是重复的模式化的代码会非常多,所以具体实现还是用了一些技巧尽量让重复的代码少一点。

方案参考了temporalio/sdk-core,具体实现参见代码

为了复用retry的逻辑,单独抽象出了retry模块。首先定义了RetryClient

1
2
3
4
pub struct RetryClient<SG> {
client: SG,
retry_config: RetryConfig,
}

其中client是原始的gRPC clientretry_config是重试相关的选项,比如最多重试多少次等。

重试的逻辑在其成员方法call_with_retry中,里面主要用到了FutureRetry,即把整个调用封装成一个Future闭包,退避策略则使用了ExponentialBackoff

当然最根本的还是前面提到的,要封装一层,使闭包的参数是可以Clone的。这部分都是一些模式化的代码,因此使用了一个宏来自动生成相关代码:

1
2
3
4
5
6
7
8
macro_rules! retry_call {
($myself:ident, $call_name:ident) => { retry_call!($myself, $call_name,) };
($myself:ident, $call_name:ident, $($args:expr),*) => {{
let call_name_str = stringify!($call_name);
let fact = || { async { $myself.get_client_clone().$call_name($($args,)*).await.map(|ret| ret.into_inner()) }};
$myself.call_with_retry(fact, call_name_str).await
}}
}

为了让RetryClient能够用于不同的Service,这里会把每个Service的客户端函数定义成一个Trait。比如:

1
2
3
4
5
6
7
8
#[async_trait::async_trait]
pub trait StorageClientTrait {
async fn store(&self, content: storage::Content) -> Result<common::StatusCode, tonic::Status>;

async fn load(&self, key: storage::ExtKey) -> Result<storage::Value, tonic::Status>;

async fn delete(&self, key: storage::ExtKey) -> Result<common::StatusCode, tonic::Status>;
}

注意这里的函数原型是封装之后的。

然后为RetryClient相对应的特化类型实现这个Trait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#[async_trait::async_trait]
impl StorageClientTrait for RetryClient<StorageServiceClient<InterceptedSvc>> {
async fn store(&self, content: storage::Content) -> Result<common::StatusCode, tonic::Status> {
retry_call!(self, store, content.clone())
}

async fn load(&self, key: storage::ExtKey) -> Result<storage::Value, tonic::Status> {
retry_call!(self, load, key.clone())
}

async fn delete(&self, key: storage::ExtKey) -> Result<common::StatusCode, tonic::Status> {
retry_call!(self, delete, key.clone())
}
}

内容是完全使用前面的宏来实现的。第一个参数是RetryClientself,第二个参数是gRPC接口的名称,后面是接口的参数。

这样就实现了一个尽量通用的RetryClient,然后以尽量少的重复代码来为多个Service都实现了重试的功能。

用法可以参见里面的测试代码。

1
2
3
let mock_client = TestClient::new(code);
let retry_client = RetryClient::new(mock_client, Default::default());
let result = retry_client.test(1).await;

首先按照原有的方法获取底层的Client;然后将其和RetryConfig一起放入RetryClient,得到带重试功能的客户端;用这个客户端调用前述Trait中封装的方法就会自带重试功能了。