backon 是一个 Rust 错误重试库,今天这篇文章旨在跟分享我在实现它的过程中一些技巧~

缘起

Apache OpenDAL 实现 RetryLayer 时需要提供一种 backoff 机制,以实现指数退避和 jitter 等特性。虽然我已经通过简单的搜索找到了 backoff,但我并不十分满意。首先,我注意到这个库的维护状况似乎不太好,有 4 个未合并的PR,而且主分支上一次更新是在 2021 年。其次,我不喜欢它提供的API:

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    retry(ExponentialBackoff::default(), || async { fetch().await }).await
}

backoff 的实现并不复杂,为什么不自己造一个用起来舒服的呢?

设计

我头脑中第一个想法是使用 Iterator<Item = Duration> 来表示 backoff。任何能够返回 Duration 类型的 iterator 都可以作为 backoff 使用。使用 iterator 来表示 backoff 具有非常直接和清晰的含义,使用者可以轻松地理解和上手实现,而无需阅读每个函数的注释。其次,我希望为 backoff 提供类似于 Rust 原生函数的使用体验:

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
  	fetch.retry(ExponentialBackoff::default()).await
}

看起来很不错:简单直接,不打乱用户的阅读顺序,一眼能定位业务逻辑位置,让我们着手实现它吧!

实现

首先,我们需要了解的是,Rust中的async函数本质上都是生成器(generator)。这些生成器会捕获当前环境的变量,并生成一个匿名的 Future。如果要重试一个 async 函数,我们需要再次调用这个生成器来生成一个全新的Future 来执行。

我曾经走过的弯路是 Failed demo for retry: we can't retry a future directly ,当时我天真地想直接重试一个 TryFuture:


pub trait Retryable<B: Policy, F: Fn(&Self::Error) -> bool>: TryFuture + Sized {
    fn retry(self, backoff: B, handle: F) -> Retry<Self, B, F>;
}

现在我明白了这种做法是错误的。一旦 Future 进入 Poll::Ready 状态,我们就不应该再去轮询它,这也正如文档所描述的:

Once a future has completed (returned Ready from poll), calling its poll method again may panic, block forever, or cause other kinds of problems

接下来需要调整自己的思路,针对 || -> impl Future<Result<T>> 来实现。首先我定义了一个 Retryable trait 并为所有的 FnMut() -> Fut 实现:

pub trait Retryable<
    B: BackoffBuilder,
    T,
    E,
    Fut: Future<Output = Result<T, E>>,
    FutureFn: FnMut() -> Fut,
>
{
    /// Generate a new retry
    fn retry(self, builder: &B) -> Retry<B::Backoff, T, E, Fut, FutureFn>;
}

impl<B, T, E, Fut, FutureFn> Retryable<B, T, E, Fut, FutureFn> for FutureFn
where
    B: BackoffBuilder,
    Fut: Future<Output = Result<T, E>>,
    FutureFn: FnMut() -> Fut,
{
    fn retry(self, builder: &B) -> Retry<B::Backoff, T, E, Fut, FutureFn> {
        Retry::new(self, builder.build())
    }
}

这个 trait 涉及到一下类型参数:

  • B: BackoffBuilder: 用户传的 backoff builder,用于指定不同的 backoff 参数
  • FutureFn: FnMut() -> Fut:表示其类型是返回 Fut 的函数
    • FnOnce 要求 take ownership,不能多次调用
    • Fn 只能拿到 &self 引用,很多场景下使用会受限
  • Fut: Future<Output = Result<T, E>>:这表示一个 Future,它返回的类型是 Result<T, E>

返回的 Retry 结构体则包装了上述这些所有类型:

pub struct Retry<B: Backoff, T, E, Fut: Future<Output = Result<T, E>>, FutureFn: FnMut() -> Fut> {
    backoff: B,
    retryable: fn(&E) -> bool,
    notify: fn(&E, Duration),
    future_fn: FutureFn,

    #[pin]
    state: State<T, E, Fut>,
}

除了 backofffuture_fn 之外,我们引入了 retryablenotify 用来实现 retryable error 检查和通知功能。类型系统想清楚之后,接下来的工作就是给 Retry 实现正确的 Future trait 了,细节不再赘述:

impl<B, T, E, Fut, FutureFn> Future for Retry<B, T, E, Fut, FutureFn>
where
    B: Backoff,
    Fut: Future<Output = Result<T, E>>,
    FutureFn: FnMut() -> Fut,
{
    type Output = Result<T, E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        ...
    }
}

此外,还有一些事务性的工作需要完成:我们需要让用户定义哪些Error是可以进行重试的,并且需要提供自定义通知重试的功能。

最后组合起来的效果如下:

#[tokio::main]
async fn main() -> Result<()> {
    let content = fetch
        .retry(&ExponentialBuilder::default())
  		.when(|e| e.to_string() == "EOF")
        .notify(|err, dur| {
            println!("retrying error {:?} with sleeping {:?}", err, dur);
        })
        .await?;

    Ok(())
}

看起来很完美!

One More Thing

哦,等一等,backon 还不支持同步函数!没关系,我们只需要应用相同的思路:

pub trait BlockingRetryable<B: BackoffBuilder, T, E, F: FnMut() -> Result<T, E>> {
    /// Generate a new retry
    fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, F>;
}

impl<B, T, E, F> BlockingRetryable<B, T, E, F> for F
where
    B: BackoffBuilder,
    F: FnMut() -> Result<T, E>,
{
    fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, F> {
        BlockingRetry::new(self, builder.build())
    }
}

由于 fn_traits 特性还没有 stable,所以我选择给 BlockingRetry 增加了一个新的函数:

impl<B, T, E, F> BlockingRetry<B, T, E, F>
where
    B: Backoff,
    F: FnMut() -> Result<T, E>,
{
  pub fn call(mut self) -> Result<T, E> {
	...
  }
}

在 call 中完成重试的操作,用起来感觉也很不错,跟 Async 的版本有一种相呼应的美感。

fn main() -> Result<()> {
    let content = fetch
        .retry(&ExponentialBuilder::default())
  		.when(|e| e.to_string() == "EOF")
        .notify(|err, dur| {
            println!("retrying error {:?} with sleeping {:?}", err, dur);
        })
        .call()?;

    Ok(())
}

总结

在本文中,我分享了 backon 的设计和具体实现。在这个过程中,我主要使用了Rust的泛型机制,分别为 FnMut() -> FutFnMut() -> Result<T, E> 来实现了自定义 trait 来增加新的功能。我希望这个实现能够启发大家设计更加用户友好的库 API。

感谢大家的阅读!