时序数据库Influx-IOx源码学习十二(物理计划的执行)

Posted PP小能手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了时序数据库Influx-IOx源码学习十二(物理计划的执行)相关的知识,希望对你有一定的参考价值。

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章介绍了一个SQL是怎样从字符串转换到物理执行计划的,详情见:

https://my.oschina.net/u/3374539/blog/5035628

这一章主要记录一下物理计划是怎样执行的。


在上一篇文章的末尾,我们展示了物理计划之中存储的数据,这些数据代表了当前整个数据库中,能够与用户输入的查询表相关联的所有数据。

对于一般数据库来讲,在物理计划中更应该是指向索引相关的信息,举例来说:select * from table1 ,在物理计划里,应该是要拿到table1的表描述、存储数据的文件路径、文件大小、等等,而不是拿到真实数据。在文章最末尾中,有一段省略的数据,为什么会出现数据呢?其实这是数据库设计的缓存,缓存的数据本来就没有落到磁盘上,所以直接在物理计划中也会持有RBChunk和MBChunk的数据引用。

对于一个过滤而言,会在物理计划中产生对应的信息,展示如下:

select * from myMeasurement where fieldKey like \'value1\';

input: FilterExec { predicate: BinaryExpr { left: Column { name: "fieldKey" }, op: Like, right: Literal { value: Utf8("value1") } }

接下来看物理计划的执行代码:

pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
    match plan.output_partitioning().partition_count() {
        0 => Ok(vec![]),
        //单一块的时候直接取出数据
        1 => {
            let it = plan.execute(0).await?;
            common::collect(it).await
        }
        //多个数据块的时候就需要进行合并数据
        _ => {
            let plan = MergeExec::new(plan.clone());
            assert_eq!(1, plan.output_partitioning().partition_count());
            //这里分为了两步execute 和 collect
            common::collect(plan.execute(0).await?).await
        }
    }
}

接下来看plan.execute方法:

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
       。。。省略
      tokio::spawn(async move {
           //这里的input就代表了上面展示的filter的input或者是数据的input
          let mut stream = match input.execute(part_i).await {
              Err(e) => {
                  let arrow_error = ArrowError::ExternalError(Box::new(e));
                  sender.send(Err(arrow_error)).await.ok();
                  return;
              }
              Ok(stream) => stream,
          };
          //计划执行完成之后返回一个stream,这里就是一直next获取完
          while let Some(item) = stream.next().await {
              sender.send(item).await.ok();
          }
      });
      。。。省略
}

上面的input代表了以下这么多东西: 

上面展示的为datafusion框架里的Plan,也就是通用sql都需要实现的功能,下面是iox项目中实现的Plan是完成数据获取的。

Plan之间的关系是嵌套的,想象一下上一章的大图,比如coalesceBatchesExec里可能还会包含filter,主要就是描述整个sql语句中都出现了什么。所有出现的plan就会对数据进行一次全面的过滤。

姑且不看过滤的细节,只看获取数据的部分(ExecutionPlan for IOxReadFilterNode)。

async fn execute(
        &self,
        partition: usize,
    ) -> datafusion::error::Result<SendableRecordBatchStream> {
        //因为在前面物理计划中得到了所有列,这里拿出列的名字
        let fields = self.schema.fields();
        let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
        //多个分区的时候可以根据分区号拿出chunk信息
        let ChunkInfo {
            chunk,
            chunk_table_schema,
        } = &self.chunk_and_infos[partition];

        //过滤出来列名字对应的arrow的filed,这里就存在不对应的问题,假如用户输入了ABC,但是chunk_table_schema中并不存在,这里就会是一个空
        let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
        let selection = Selection::Some(&selection_cols);
        //使用predicate过滤一次,但是我调试的时候一直是空的,也就是查询出所有数据。
        let stream = chunk
            .read_filter(&self.table_name, &self.predicate, selection)
            .map_err(|e| {
                DataFusionError::Execution(format!(
                    "Error creating scan for table {} chunk {}: {}",
                    self.table_name,
                    chunk.id(),
                    e
                ))
            })?;
        //这里使用SchemaAdapterStream的结构来填充空值列
        let adapter = SchemaAdapterStream::try_new(stream, Arc::clone(&self.schema))
            .map_err(|e| DataFusionError::Internal(e.to_string()))?;

        Ok(Box::pin(adapter))
    }

这个SchemaAdapterStream在代码中给了一个特别形象的描述:

///
///                       ┌────────────────┐                         ┌─────────────────────────┐
///                       ┌─────┐┌─────┐ ┌─────┐┌──────┐┌─────┐  ///                       A  ││  C  A  ││  B   ││  C  ///                       -  ││  -  -  ││  -   ││  -  /// ┌──────────────┐      1  ││ 10  ┌──────────────┐    1  ││ NULL ││ 10  /// Input     2  ││ 20  Adapter    2  ││ NULL ││ 20  /// Stream    ├────▶ 3  ││ 30  │────▶│    Stream    ├───▶│ 3  ││ NULL ││ 30  /// └──────────────┘      4  ││ 40  └──────────────┘    4  ││ NULL ││ 40  ///                       └─────┘└─────┘ └─────┘└──────┘└─────┘  ///                       ///                       Record Batch  Record Batch       ///                       └────────────────┘                         └─────────────────────────┘
///

接下来看如何实现数据查找的:

fn read_filter(
        &self,
        table_name: &str,
        predicate: &Predicate,
        selection: Selection<\'_>,
    ) -> Result<SendableRecordBatchStream, Self::Error> {
         //chunk存在变体,这里就是先判断是什么chunk,有三种MB,RB,ParquetFile
        match self {
            //还是在写入阶段的buffer,暂时不支持查询条件
            Self::MutableBuffer { chunk, .. } => {
                if !predicate.is_empty() {
                    return InternalPredicateNotSupported {
                        predicate: predicate.clone(),
                    }
                    .fail();
                }
                let batch = chunk
                    .read_filter(table_name, selection)
                    .context(MutableBufferChunk)?;

                Ok(Box::pin(MemoryStream::new(vec![batch])))
            }
            //不可写阶段的buffer,对数据进行过滤
            Self::ReadBuffer { chunk, .. } => {
                let rb_predicate =
                    to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
                //读取数据并过滤
                let read_results = chunk
                    .read_filter(table_name, rb_predicate, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //读取schema信息并过滤
                let schema = chunk
                    .read_filter_table_schema(table_name, selection)
                    .context(ReadBufferChunkError {
                        chunk_id: chunk.id(),
                    })?;
                //ReadFilterResultsStream是对不同的chunk类型实现的读取接口
                Ok(Box::pin(ReadFilterResultsStream::new(
                    read_results,
                    schema.into(),
                )))
            }
            //Parquet同理
            Self::ParquetFile { chunk, .. } => chunk
                .read_filter(table_name, predicate, selection)
                .context(ParquetFileChunkError {
                    chunk_id: chunk.id(),
                }),
        }
    }

数据到了这里就会按照你选择的表名、列名,将数据全部查询出来了。在代码中的predicate,一直是空的,暂时不确定是如何填充的,后面再看。

数据从这里全部查询出来之后,会返回给datafusion框架,继续按照开头写到的过滤器进行过滤,就是遍历一遍数据判断大于、小于或者like等等。

好了查询就先写到这里。

祝玩儿的开心!!


欢迎关注微信公众号:

或添加微信好友: liutaohua001

以上是关于时序数据库Influx-IOx源码学习十二(物理计划的执行)的主要内容,如果未能解决你的问题,请参考以下文章

第十二课时:时序分析中的基本概念和术语

(九十二)加速计的使用方法(过期方法+新方法)

yii2源码学习笔记(十二)

12小时计时法跟24小时计时法有啥不同

PgSQL · 源码分析 · PostgreSQL物理备份内部原理

SparkStreaming(源码阅读十二)