Iteration 14 从 5/21 开始到 6/4 结束,为期两周。这个周期主要在做 Databend 的压缩支持,具体的来说是支持如下功能:

支持读取 Stage/Location 中的压缩文件

copy into ontime200 from '@s1' FILES = ('ontime_200.csv.gz') 
    FILE_FORMAT = (type = 'CSV' field_delimiter = ',' compression = 'gzip'  record_delimiter = '\n' skip_header = 1);

支持流式上传压缩文件:

curl -H "insert_sql:insert into ontime_streaming_load format Csv" \
    -H "skip_header:1" \
    -H "compression:zstd" \
    -F  "upload=@/tmp/ontime_200.csv.zst" \
    -u root: \
    -XPUT \
    "http://localhost:8000/v1/streaming_load"

Rust 目前压缩算法的库已经比较齐全,所以主要的工作在于如何将解压缩的逻辑与 Databend 现有的逻辑整合到一起。


解压缩工作流程

大部分解压缩算法都可以抽象为这样的状态机:

其中最复杂的是 Decode 流程:

  • 当数据消费完毕后需要获取更多数据
  • 当数据还没有消费完成时需要再次调用 Decode

有的压缩算法会支持多个对象,因此在 Flush 状态时也有可能进行 reinit,重新开始一轮新的解压缩,此处不再赘述。

OpenDAL 中就运用这样的抽象:

pub enum DecompressState {
    Reading,
    Decoding,
    Flushing,
    Done,
}

对外暴露了 DecompressDecoder

impl DecompressDecoder {
    /// Get decompress state
    pub fn state(&self) -> DecompressState {}
    /// Fetch more data from underlying reader.
    pub fn fill(&mut self, bs: &[u8]) -> usize {}
    /// Decode data into output.
    pub fn decode(&mut self, output: &mut [u8]) -> Result<usize> {}
    /// Finish a decompress press, flushing remaining data into output.
    pub fn finish(&mut self, output: &mut [u8]) -> Result<usize> {}
}

为了方便用户使用,OpenDAL 在 DecompressDecoder 基础上实现了 DecompressReader

impl<R: BytesRead> futures::io::AsyncRead for DecompressReader<R> {}

对接 async-compression

async-compression 是由 @Nemo157 开发的异步压缩库,支持了绝大多数常用的压缩算法。OpenDAL 就是基于 async-compression 实现的:

其内部使用了 Decode trait:

pub trait Decode {
    /// Reinitializes this decoder ready to decode a new member/frame of data.
    fn reinit(&mut self) -> Result<()>;

    /// Returns whether the end of the stream has been read
    fn decode(
        &mut self,
        input: &mut PartialBuffer<impl AsRef<[u8]>>,
        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
    ) -> Result<bool>;

    /// Returns whether the internal buffers are flushed
    fn flush(&mut self, output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>)
        -> Result<bool>;

    /// Returns whether the internal buffers are flushed
    fn finish(
        &mut self,
        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
    ) -> Result<bool>;
}

async-compression 为了避免将内部实现的细节暴露给用户,codec 相关的模块都是 private 的,用户只能通过 bufread 等方式来调用。

但这种方式在 Databend 中是行不通的:Databend 为了能够更充分的利用机器资源,对异步和同步任务进行了严格的划分:

  • CPU Bound 的任务必须要在同步的 Runtime 中进行
  • IO Bound 的任务必须要在异步的 Runtime 中进行

如果在异步 Runtime 中进行解压缩,可能会 block runtime,降低整体的吞吐性能。

想要自己掌控解压缩发生在哪个 runtime,我们就必须要能直接操作底层的 Decode,为此我提交了一份: proposal: Export codec and Encode/Decode trait。作者最近刚好也在从事相关的工作,想把底层的一些 Codec 暴露出来。于是我在 proposal 中详细地介绍了 Databend 的 Use Case,并分享了自己的临时 Workaround。实际上 async-compression 的代码组织的很好,只需要将内部的 Decode trait 及其实现都 public 出来即可。

由于发布到 crate 的包必须使用 tagged version,所以我把这个 wordaround 也发布成了一个 crate: async-compression-issue-150-workaround

这里用到了一个 Cargo.toml 的小技巧:为 package 指定一个 alias,让它能够在不修改 crate name 的情况下使用自己指定的另一个包。

# Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved.
async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [
  "futures-io",
  "all-algorithms",
], optional = true }

对接 Databend

完成了 OpenDAL 测的开发,Databend 这边主要是对接和测试工作了。在对接的时候意外的发现 Databend 目前处理 Streaming Loading 和 COPY FROM STAGE 走的是两条完全不同的路径,所以解压缩的处理也不得实现了两遍,未来希望能将他们统一起来。

目前为止,Databend 已经能够支持 GZIPBZ2BROTLIZSTDDEFLATERAW_DEFLATE 等压缩算法,未来根据需要还可以进一步扩展,欢迎大家来试用解压缩功能!

下一步计划

  • 重构流式加载和从 STAGE 加载的逻辑,尽可能复用相同的逻辑
  • 解压缩性能测试及其优化
  • 支持 LZO,SNAPPY 等压缩算法
  • 支持 ZIP,TAR 等归档格式