Iteration 14 从 5/21 开始到 6/4 结束,为期两周。这个周期主要在做 Databend 的压缩支持,具体的来说是支持如下功能:
支持读取 Stage/Location 中的压缩文件
copy into ontime200 from '@s1' FILES = ('ontime_200.csv.gz')
FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'gzip' record_delimiter = '\n' skip_header = 1);
支持流式上传压缩文件:
curl -H "insert_sql:insert into ontime_streaming_load format Csv" \
-H "skip_header:1" \
-H "compression:zstd" \
-F "upload=@/tmp/ontime_200.csv.zst" \
-u root: \
-XPUT \
"http://localhost:8000/v1/streaming_load"
Rust 目前压缩算法的库已经比较齐全,所以主要的工作在于如何将解压缩的逻辑与 Databend 现有的逻辑整合到一起。
解压缩工作流程
大部分解压缩算法都可以抽象为这样的状态机:
其中最复杂的是 Decode
流程:
- 当数据消费完毕后需要获取更多数据
- 当数据还没有消费完成时需要再次调用
Decode
有的压缩算法会支持多个对象,因此在
Flush
状态时也有可能进行reinit
,重新开始一轮新的解压缩,此处不再赘述。
OpenDAL 中就运用这样的抽象:
pub enum DecompressState {
Reading,
Decoding,
Flushing,
Done,
}
对外暴露了 DecompressDecoder
:
impl DecompressDecoder {
/// Get decompress state
pub fn state(&self) -> DecompressState {}
/// Fetch more data from underlying reader.
pub fn fill(&mut self, bs: &[u8]) -> usize {}
/// Decode data into output.
pub fn decode(&mut self, output: &mut [u8]) -> Result<usize> {}
/// Finish a decompress press, flushing remaining data into output.
pub fn finish(&mut self, output: &mut [u8]) -> Result<usize> {}
}
为了方便用户使用,OpenDAL 在 DecompressDecoder
基础上实现了 DecompressReader
impl<R: BytesRead> futures::io::AsyncRead for DecompressReader<R> {}
对接 async-compression
async-compression 是由 @Nemo157 开发的异步压缩库,支持了绝大多数常用的压缩算法。OpenDAL 就是基于 async-compression 实现的:
其内部使用了 Decode
trait:
pub trait Decode {
/// Reinitializes this decoder ready to decode a new member/frame of data.
fn reinit(&mut self) -> Result<()>;
/// Returns whether the end of the stream has been read
fn decode(
&mut self,
input: &mut PartialBuffer<impl AsRef<[u8]>>,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
) -> Result<bool>;
/// Returns whether the internal buffers are flushed
fn flush(&mut self, output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>)
-> Result<bool>;
/// Returns whether the internal buffers are flushed
fn finish(
&mut self,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
) -> Result<bool>;
}
async-compression 为了避免将内部实现的细节暴露给用户,codec
相关的模块都是 private 的,用户只能通过 bufread
等方式来调用。
但这种方式在 Databend 中是行不通的:Databend 为了能够更充分的利用机器资源,对异步和同步任务进行了严格的划分:
- CPU Bound 的任务必须要在同步的 Runtime 中进行
- IO Bound 的任务必须要在异步的 Runtime 中进行
如果在异步 Runtime 中进行解压缩,可能会 block runtime,降低整体的吞吐性能。
想要自己掌控解压缩发生在哪个 runtime,我们就必须要能直接操作底层的 Decode
,为此我提交了一份: proposal: Export codec and Encode/Decode trait。作者最近刚好也在从事相关的工作,想把底层的一些 Codec 暴露出来。于是我在 proposal 中详细地介绍了 Databend 的 Use Case,并分享了自己的临时 Workaround。实际上 async-compression 的代码组织的很好,只需要将内部的 Decode
trait 及其实现都 public 出来即可。
由于发布到 crate 的包必须使用 tagged version,所以我把这个 wordaround 也发布成了一个 crate: async-compression-issue-150-workaround。
这里用到了一个 Cargo.toml
的小技巧:为 package 指定一个 alias,让它能够在不修改 crate name 的情况下使用自己指定的另一个包。
# Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved.
async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [
"futures-io",
"all-algorithms",
], optional = true }
对接 Databend
完成了 OpenDAL 测的开发,Databend 这边主要是对接和测试工作了。在对接的时候意外的发现 Databend 目前处理 Streaming Loading 和 COPY FROM STAGE 走的是两条完全不同的路径,所以解压缩的处理也不得实现了两遍,未来希望能将他们统一起来。
- feat: Add decompress support for COPY INTO and streaming loading 增加了初步的解压缩支持
- docs: Add compression related docs 增加了解压缩功能相关的文档
- feat(query): Add support for compression auto and raw_deflate 完善了 AUTO 和 Raw Deflate 算法的细节
- fix(query): Fix compressed buf not consumed correctly 修复了解压缩实现中的 BUG
目前为止,Databend 已经能够支持 GZIP
,BZ2
,BROTLI
,ZSTD
,DEFLATE
,RAW_DEFLATE
等压缩算法,未来根据需要还可以进一步扩展,欢迎大家来试用解压缩功能!
下一步计划
- 重构流式加载和从 STAGE 加载的逻辑,尽可能复用相同的逻辑
- 解压缩性能测试及其优化
- 支持 LZO,SNAPPY 等压缩算法
- 支持 ZIP,TAR 等归档格式