Iteration 10 从 3/26 开始到 4/8 结束,为期两周。这个周期里面我觉得这些工作比较有意思:
OpenDAL 错误处理改进
OpenDAL 最开始的定位就是要做一个独立的数据层,所以错误处理也是使用了自己内部的一套,没有复用 Databend 现有库。这导致上层业务逻辑在判断的时候需要做很多额外的工作,比如在 PR ISSUE-4241: version of storage layout 中,为了处理 error 的转换,贡献者需要额外做判断。配合 OpenDAL 的 API 调整,这个周期中我将 OpenDAL 返回的错误类型修改为 srd::io::Error。主要的考量是基于两个方面:API 设计与用户体验。
首先考虑的是 API 的设计是否自洽。
在 OpenDAL v0.4 的重构中,Accessor
中的 write
函数从接收一个 BytesReader
变为返回一个 BytesWriter
:
pub trait BytesWrite: AsyncWrite + Unpin + Send { }
pub type BytesWriter = Box<dyn BytesWrite>;
async fn write(&self, args: &OpWrite) -> Result<BytesWriter> {
let _ = args;
unimplemented!()
}
BytesWriter
本质上就是一个被 Box 的 AsyncWrite
,而 AsyncWrite
中返回的 futures::io::Error
是到 std::io::Error
的 alias。
pub trait AsyncWrite {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<Result<usize, Error>>;
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Error>>;
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Result<(), Error>>;
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>]
) -> Poll<Result<usize, Error>> { ... }
}
如果我们需要传递自己的 error 类型的话,就需要反复的进行与 io::Error
的类型转换,不仅实现上特别罗嗦,而且会导致信息的丢失。通过分析返回的 error 的情况,还发现 OpenDAL 定义的 error 实际上就是 io::Error
的子集。所以不如直接修改成返回 io::Error
,把内部 error 中的 kind 去掉,作为 context 传递。
/// ObjectError carries object related context.
///
/// # Notes
///
/// This error is used to carry context only, and should never be returned to users.
/// Please wrap in [`std::io::Error`] with correct [`std::io::ErrorKind`] instead.
#[derive(Error, Debug)]
#[error("object error: (op: {op}, path: {path}, source: {source})")]
pub(crate) struct ObjectError {
op: &'static str,
path: String,
source: anyhow::Error,
}
impl ObjectError {
pub fn new(op: &'static str, path: &str, source: impl Into<anyhow::Error>) -> Self {
ObjectError {
op,
path: path.to_string(),
source: source.into(),
}
}
}
其次是考虑用户体验。
std::io::Error
已经在各个 IO 相关的库中被广泛使用:返回 io::Error
能够极大地简化用户的处理逻辑,不需要再针对 OpenDAL 做特别的处理。而且用户的心智负担也会特别小,之前使用 std::fs
是如何检查错误的,使用 opendal 也是如何检查:
if let Err(e) = op.object("test_file").metadata().await {
if e.kind() == ErrorKind::NotFound {
println!("object not exist")
}
}
综合以上改动,我在 PR deps: Bump to OpenDAL v0.4 中调整了 Databend 关于存储层的错误处理逻辑:
清理了没有被良好定义的错误:
- UnknownStorageSchemeName(3001),
- SecretKeyNotSet(3002),
- DalTransportError(3003),
- DalPathNotFound(3004),
- SerdeError(3005),
- DalError(3006),
- DalStatError(3007),
+ StorageNotFound(3001),
+ StoragePermissionDenied(3002),
+ StorageOther(4000)
实现了 io::Error
到内置 ErrorCode
的无缝转换
impl From<std::io::Error> for ErrorCode {
fn from(error: std::io::Error) -> Self {
use std::io::ErrorKind;
let msg = format!("{} ({})", error.kind(), &error);
match error.kind() {
ErrorKind::NotFound => ErrorCode::StorageNotFound(msg),
ErrorKind::PermissionDenied => ErrorCode::StoragePermissionDenied(msg),
_ => ErrorCode::StorageOther(msg),
}
}
}
这样 Databend 就能够比较优雅地处理涉及到 opendal 的错误了:
- let meta = object.metadata().await.map_err(|e| match e.kind() {
- DalErrorKind::ObjectNotExist => ErrorCode::DalPathNotFound(e.to_string()),
- _ => ErrorCode::DalTransportError(e.to_string()),
- })?;
+ let meta = object.metadata().await?;
舒服了!
修改 Databend storage.disk 为 storage.fs
命名是头等大事:好名字能够准确直接地反应自身的功用,降低用户的理解成本;而坏名字则会引起混淆,让用户产生错误的理解。
长期以来,Databend 的 storage type 就一直在错误地混淆 aws_s3
与 s3
,disk
与 fs
,前者在 opendal 立项的时候就予以纠正了,而后者直到最近才在 PR *: Rename storage.disk to storage.fs 中修复。
首先聊聊 aws_s3
与 s3
。
s3
已经是一个事实上的业界标准,而不仅仅是 AWS 的某个产品。当我们提到 s3
时,更多的时候是在指兼容 s3 API 的对象存储服务,包括但不限于 AWS S3
,Aliyun OSS
,Tencent COS
,minio
,digitialocean S3
等一系列服务。而使用 aws_s3
时,则通常是在特指 AWS
提供的 S3
服务。
因此如果我们在代码和配置中强调 aws_s3
,就会引起用户的混淆。
其次聊聊 disk
与 fs
。
在 Proposal: Rename storage.disk to storage.fs 中,我提到了 disk
与 fs
的区别:disk
通常用来指代块设备,比如 SSD
和 HDD
,在云上则是指形如 EBS
这样的服务,他们对外暴露的是块接口;而 fs
则是用来指代文件系统,比如 ext4
,btrfs
,在云上指形如 EFS
这样的服务,他们对外暴露的接口是(类)POSIX 语义的文件接口。在 从应用接口视角看存储系统 中,我对这个话题有更详细的阐述。
显然的,Databend 不能直接基于块设备工作:它需要文件系统提供支持。不仅如此,disk
还抹除了更多的可能性,比如:NFS
,基于 ceph
,s3fs
,juicefs
的 FUSE
等等。
使用 disk
作为 storage type 会引起用户的混淆。
好在这些问题已经得到了纠正:)
mergify 踩坑小技巧
mergify
是一个用于 Pull Requests 自动化的服务,它支持按照预先制定好的策略对 PR 排队并 merge。由于 Github Action API 的限制,在指定这些策略的时候需要注意 Validating All Status Checks 的坑:
There is no such thing as "every status check" in GitHub.
- 每个 PR 有自己的 status checks 列表
- 在创建/有新提交的时候,PR 是没有 status checks 的
- status checks 可能没有被正确汇报(比如 jenkins 挂了),导致它没有出现在列表中
基于以上限制,如果使用
#check-failure=0
check-success~=build
这样的判断,就会很容易导致测试实际上并没有跑完就 pass 了。
Databend 之前也踩过一些坑,使用 '#check-success>=12'
来保证通过的测试数量大于 12 个这样的 workaround 来解决。但是这个解决方案并不彻底,随着时间的推移,新测试的增加,我们发现有些 PR 在测试没有通过的情况下也被 merge 了。为了彻底解决这个问题,我在 PR ci: Fix mergify status check 中按照文档的描述显式指定了所有必须通过的测试列表。
RisingWave 蹭贡献
RisingWave 是一个 Rust 开发的云原生流式数据库,主打物化视图增量更新,兼容 PostgreSQL 接口,用来构建实时应用。我也抓着开源的机会蹭了一些贡献。
在 PR refactor(pgwire): Match on char to avoid utf8 convert 中,我修复了一处没有必要的 utf-8 convert:
- let val = &[stream.read_u8().await?];
- let tag = std::str::from_utf8(val).unwrap();
+ let val = stream.read_u8().await?;
这里其实可以直接对 char 做判断,不需要转换成 str。
在 PR refactor(storage/object/s3): Remove not needed check on ranged request 中,我尝试去掉一个不必要的 content length 检查,但最终发现这个检查是有必要的:因为 hyper
不会检查返回的 body content length 长度,所以如果服务器端返回的 body length 和 content-length 不一致,应用就会出现非预期的行为。
使用 minio 进行高可用测试,频繁地在读取数据时 kill -9 就会很容易遇到这样的情况:server 已经返回了 HTTP status code 和 headers 但是 body 因为服务器已经被 kill 了没有发过来,用户的读取就这样直接完成了。我在 PR 中给出了完整的复现代码:
async fn handle(_: Request<Body>) -> Result<Response<Body>, Infallible> {
let resp = Response::builder()
.status(StatusCode::OK)
.header(CONTENT_LENGTH, 1024)
.body(Body::empty())
.expect("must success");
Ok(resp)
}
#[tokio::test]
async fn test_hyper() -> Result<()> {
let addr = SocketAddr::from(([127, 0, 0, 1], 9900));
let make_svc = hyper::service::make_service_fn(|_conn| async {
Ok::<_, Infallible>(hyper::service::service_fn(handle))
});
let server = hyper::Server::bind(&addr).serve(make_svc);
tokio::spawn(async move {
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
});
let client: hyper::Client<
hyper_tls::HttpsConnector<hyper::client::HttpConnector>,
hyper::Body,
> = hyper::Client::builder().build(hyper_tls::HttpsConnector::new());
let req = client
.get(http::Uri::from_static("http://127.0.0.1:9900"))
.await
.expect("must success");
let bs = hyper::body::to_bytes(req.into_body())
.await
.expect("must success");
println!("{:?}", bs);
Ok(())
}
不知道能否在 hyper 修复这个问题?
这个周期有意思的工作就是这些,下个周期再见~