backon 是一个 Rust 错误重试库,今天这篇文章旨在跟分享我在实现它的过程中一些技巧~
缘起
Apache OpenDAL 实现 RetryLayer 时需要提供一种 backoff 机制,以实现指数退避和 jitter 等特性。虽然我已经通过简单的搜索找到了 backoff
,但我并不十分满意。首先,我注意到这个库的维护状况似乎不太好,有 4 个未合并的PR,而且主分支上一次更新是在 2021 年。其次,我不喜欢它提供的API:
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
retry(ExponentialBackoff::default(), || async { fetch().await }).await
}
backoff 的实现并不复杂,为什么不自己造一个用起来舒服的呢?
设计
我头脑中第一个想法是使用 Iterator<Item = Duration>
来表示 backoff。任何能够返回 Duration
类型的 iterator
都可以作为 backoff
使用。使用 iterator
来表示 backoff
具有非常直接和清晰的含义,使用者可以轻松地理解和上手实现,而无需阅读每个函数的注释。其次,我希望为 backoff 提供类似于 Rust 原生函数的使用体验:
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
fetch.retry(ExponentialBackoff::default()).await
}
看起来很不错:简单直接,不打乱用户的阅读顺序,一眼能定位业务逻辑位置,让我们着手实现它吧!
实现
首先,我们需要了解的是,Rust中的async函数本质上都是生成器(generator)。这些生成器会捕获当前环境的变量,并生成一个匿名的 Future。如果要重试一个 async 函数,我们需要再次调用这个生成器来生成一个全新的Future 来执行。
我曾经走过的弯路是 Failed demo for retry: we can't retry a future directly ,当时我天真地想直接重试一个 TryFuture
:
pub trait Retryable<B: Policy, F: Fn(&Self::Error) -> bool>: TryFuture + Sized {
fn retry(self, backoff: B, handle: F) -> Retry<Self, B, F>;
}
现在我明白了这种做法是错误的。一旦 Future
进入 Poll::Ready
状态,我们就不应该再去轮询它,这也正如文档所描述的:
Once a future has completed (returned
Ready
frompoll
), calling itspoll
method again may panic, block forever, or cause other kinds of problems
接下来需要调整自己的思路,针对 || -> impl Future<Result<T>>
来实现。首先我定义了一个 Retryable trait
并为所有的 FnMut() -> Fut
实现:
pub trait Retryable<
B: BackoffBuilder,
T,
E,
Fut: Future<Output = Result<T, E>>,
FutureFn: FnMut() -> Fut,
>
{
/// Generate a new retry
fn retry(self, builder: &B) -> Retry<B::Backoff, T, E, Fut, FutureFn>;
}
impl<B, T, E, Fut, FutureFn> Retryable<B, T, E, Fut, FutureFn> for FutureFn
where
B: BackoffBuilder,
Fut: Future<Output = Result<T, E>>,
FutureFn: FnMut() -> Fut,
{
fn retry(self, builder: &B) -> Retry<B::Backoff, T, E, Fut, FutureFn> {
Retry::new(self, builder.build())
}
}
这个 trait 涉及到一下类型参数:
B: BackoffBuilder
: 用户传的 backoff builder,用于指定不同的 backoff 参数FutureFn: FnMut() -> Fut
:表示其类型是返回 Fut 的函数FnOnce
要求 take ownership,不能多次调用- 而
Fn
只能拿到&self
引用,很多场景下使用会受限
Fut: Future<Output = Result<T, E>>
:这表示一个 Future,它返回的类型是Result<T, E>
返回的 Retry
结构体则包装了上述这些所有类型:
pub struct Retry<B: Backoff, T, E, Fut: Future<Output = Result<T, E>>, FutureFn: FnMut() -> Fut> {
backoff: B,
retryable: fn(&E) -> bool,
notify: fn(&E, Duration),
future_fn: FutureFn,
#[pin]
state: State<T, E, Fut>,
}
除了 backoff
和 future_fn
之外,我们引入了 retryable
和 notify
用来实现 retryable error 检查和通知功能。类型系统想清楚之后,接下来的工作就是给 Retry 实现正确的 Future
trait 了,细节不再赘述:
impl<B, T, E, Fut, FutureFn> Future for Retry<B, T, E, Fut, FutureFn>
where
B: Backoff,
Fut: Future<Output = Result<T, E>>,
FutureFn: FnMut() -> Fut,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
...
}
}
此外,还有一些事务性的工作需要完成:我们需要让用户定义哪些Error是可以进行重试的,并且需要提供自定义通知重试的功能。
最后组合起来的效果如下:
#[tokio::main]
async fn main() -> Result<()> {
let content = fetch
.retry(&ExponentialBuilder::default())
.when(|e| e.to_string() == "EOF")
.notify(|err, dur| {
println!("retrying error {:?} with sleeping {:?}", err, dur);
})
.await?;
Ok(())
}
看起来很完美!
One More Thing
哦,等一等,backon 还不支持同步函数!没关系,我们只需要应用相同的思路:
pub trait BlockingRetryable<B: BackoffBuilder, T, E, F: FnMut() -> Result<T, E>> {
/// Generate a new retry
fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, F>;
}
impl<B, T, E, F> BlockingRetryable<B, T, E, F> for F
where
B: BackoffBuilder,
F: FnMut() -> Result<T, E>,
{
fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, F> {
BlockingRetry::new(self, builder.build())
}
}
由于 fn_traits 特性还没有 stable,所以我选择给 BlockingRetry 增加了一个新的函数:
impl<B, T, E, F> BlockingRetry<B, T, E, F>
where
B: Backoff,
F: FnMut() -> Result<T, E>,
{
pub fn call(mut self) -> Result<T, E> {
...
}
}
在 call 中完成重试的操作,用起来感觉也很不错,跟 Async 的版本有一种相呼应的美感。
fn main() -> Result<()> {
let content = fetch
.retry(&ExponentialBuilder::default())
.when(|e| e.to_string() == "EOF")
.notify(|err, dur| {
println!("retrying error {:?} with sleeping {:?}", err, dur);
})
.call()?;
Ok(())
}
总结
在本文中,我分享了 backon
的设计和具体实现。在这个过程中,我主要使用了Rust的泛型机制,分别为 FnMut() -> Fut
和 FnMut() -> Result<T, E>
来实现了自定义 trait 来增加新的功能。我希望这个实现能够启发大家设计更加用户友好的库 API。
感谢大家的阅读!