Hyperspace初体验:Delta Lake表索引

Posted cdai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hyperspace初体验:Delta Lake表索引相关的知识,希望对你有一定的参考价值。

Hyperspace初体验:Delta Lake表索引

1.简介

Hyperspace是一个由微软开发的开源的数据湖索引子系统。

1.1 特性

  • 提供了一套定义完好的索引管理API (4.1 建索引, 5.2 增量刷新)

    • 为用户提供更大的自由度,毕竟用户是最了解自己用例的人
    • 不尝试去解决所有问题,有些问题没有固定答案
  • 独立于数据和元数据,索引有自己的元数据/日志(4.2 Hyperspace日志)

  • 能感知到底层数据的版本,同数据一起“时间旅行” (5.1 Time Travel支持)

  • 因为Hybrid Scan,索引与原始数据不需要完全同步 (5.3 Hybrid Scan)

1.2 开发中

Hyperspace项目目前比较活跃,还有很多重要功能在开发中,想要预览的话可以尝试用 master 分支代码:

  • Spark 3.x: https://github.com/microsoft/hyperspace/issues/407

  • Iceberg: https://github.com/microsoft/hyperspace/issues/318

  • 索引实现

    • Z-ordering: https://github.com/microsoft/hyperspace/issues/515
    • Data skipping: https://github.com/microsoft/hyperspace/issues/441

2.安装

2.1 软件版本

  • Hyperspace 0.4.0支持Delta Lake表的索引
  • Delta Lake 0.6.1是最后一个支持Spark 2.x的版本
  • Spark 2.4.2和Scala 2.12.8与上述Hyperspace, Delta Lake版本兼容

因为对 Apache Spark 3.x的支持还在开发中, Spark 2.4.2看起来是目前唯一能让Hyperspace和Delta Lake共同工作的Spark版本。

2.2 CLI启动

bin/spark-shell --packages io.delta:delta-core_2.12:0.6.1,com.microsoft.hyperspace:hyperspace-core_2.12:0.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Ivy Default Cache set to: /Users/daichen/.ivy2/cache
The jars for the packages stored in: /Users/daichen/.ivy2/jars
:: loading settings :: url = jar:file:/Users/daichen/Software/spark-2.4.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.12 added as a dependency
com.microsoft.hyperspace#hyperspace-core_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-040a057e-2b7f-4d3a-a658-b93e6474dc47;1.0
    confs: [default]
    found io.delta#delta-core_2.12;0.6.1 in central
    found org.antlr#antlr4;4.7 in central
    found org.antlr#antlr4-runtime;4.7 in local-m2-cache
    found org.antlr#antlr-runtime;3.5.2 in local-m2-cache
    found org.antlr#ST4;4.0.8 in local-m2-cache
    found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in local-m2-cache
    found org.glassfish#javax.json;1.0.4 in local-m2-cache
    found com.ibm.icu#icu4j;58.2 in local-m2-cache
    found com.microsoft.hyperspace#hyperspace-core_2.11;0.3.0 in central
:: resolution report :: resolve 351ms :: artifacts dl 9ms
    :: modules in use:
    com.ibm.icu#icu4j;58.2 from local-m2-cache in [default]
    com.microsoft.hyperspace#hyperspace-core_2.11;0.3.0 from central in [default]
    io.delta#delta-core_2.12;0.6.1 from central in [default]
    org.abego.treelayout#org.abego.treelayout.core;1.0.3 from local-m2-cache in [default]
    org.antlr#ST4;4.0.8 from local-m2-cache in [default]
    org.antlr#antlr-runtime;3.5.2 from local-m2-cache in [default]
    org.antlr#antlr4;4.7 from central in [default]
    org.antlr#antlr4-runtime;4.7 from local-m2-cache in [default]
    org.glassfish#javax.json;1.0.4 from local-m2-cache in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   9   |   0   |   0   |   0   ||   9   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-040a057e-2b7f-4d3a-a658-b93e6474dc47
    confs: [default]
    0 artifacts copied, 9 already retrieved (0kB/8ms)
21/12/09 16:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.4.51:4040
Spark context available as 'sc' (master = local[*], app id = local-1639095359127).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\\ \\/ _ \\/ _ `/ __/  '_/
   /___/ .__/\\_,_/_/ /_/\\_\\   version 2.4.2
      /_/

Using Scala version 2.12.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

3.Delta Lake表

3.1 建表

本文所有例子都使用Spark自带的 employees.json 数据集。首先,我们就先用这个数据集建一张Delta Lake的分区表。

scala> val employees = spark.read.json("examples/src/main/resources/employees.json")
employees: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> employees.show()
+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+

scala> employees.write.partitionBy("salary").format("delta").save("/tmp/delta-table/employees")

现在可以看一下Delta Lake的文件夹结构:

$ tree /tmp/delta-table/employees
/tmp/delta-table/employees
├── _delta_log
│   └── 00000000000000000000.json
├── salary=3000
│   └── part-00000-e0493ade-23d4-402e-bf43-0ad727b0754e.c000.snappy.parquet
├── salary=3500
│   └── part-00000-5bae7fa3-43c3-43a7-85a2-286848b5d589.c000.snappy.parquet
├── salary=4000
│   └── part-00000-090c7267-5a47-439a-8ca6-3ae670969ac3.c000.snappy.parquet
└── salary=4500
    └── part-00000-375f485f-d021-4625-b598-1f86b3ad8953.c000.snappy.parquet

3.2 Delta Lake日志

Delta日志记录了数据文件中所有的增加和删除操作(因为Data Lake存储,比如S3,不支持文件的更新甚至追加),这个日志本身也是 Parquet格式的:

$ cat /tmp/delta-table/employees/_delta_log/00000000000000000000.json
"commitInfo":"timestamp":1639095874103,"operation":"WRITE","operationParameters":"mode":"ErrorIfExists","partitionBy":"[\\"salary\\"]","isBlindAppend":true,"operationMetrics":"numFiles":"4","numOutputBytes":"1679","numOutputRows":"4"
"protocol":"minReaderVersion":1,"minWriterVersion":2
"metaData":"id":"0c5bbb47-1ee3-4637-883c-61284a73501b","format":"provider":"parquet","options":,"schemaString":"\\"type\\":\\"struct\\",\\"fields\\":[\\"name\\":\\"name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":,\\"name\\":\\"salary\\",\\"type\\":\\"long\\",\\"nullable\\":true,\\"metadata\\":]","partitionColumns":["salary"],"configuration":,"createdTime":1639095873565
"add":"path":"salary=3000/part-00000-e0493ade-23d4-402e-bf43-0ad727b0754e.c000.snappy.parquet","partitionValues":"salary":"3000","size":434,"modificationTime":1639095873996,"dataChange":true
"add":"path":"salary=3500/part-00000-5bae7fa3-43c3-43a7-85a2-286848b5d589.c000.snappy.parquet","partitionValues":"salary":"3500","size":425,"modificationTime":1639095874020,"dataChange":true
"add":"path":"salary=4000/part-00000-090c7267-5a47-439a-8ca6-3ae670969ac3.c000.snappy.parquet","partitionValues":"salary":"4000","size":416,"modificationTime":1639095874044,"dataChange":true
"add":"path":"salary=4500/part-00000-375f485f-d021-4625-b598-1f86b3ad8953.c000.snappy.parquet","partitionValues":"salary":"4500","size":404,"modificationTime":1639095874069,"dataChange":true

4.Hyperspace基础

4.1 创建索引

让我们先来创建一个索引(Hyperspace目前主要支持Covering索引,即索引本身也包括其他列数据,这样查询可以直接在索引里完成,无需访问原始数据):

scala> val employees = spark.read.format("delta").load("/tmp/delta-table/employees")
employees: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]                                                                                          

scala> spark.conf.set("spark.hyperspace.index.sources.fileBasedBuilders",
     |       "com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder," +
     |       "com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder")

scala> import com.microsoft.hyperspace._
import com.microsoft.hyperspace._

scala> val hyperspace = new Hyperspace(spark)
hyperspace: com.microsoft.hyperspace.Hyperspace = com.microsoft.hyperspace.Hyperspace@515a3572

scala> import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index._

scala> hyperspace.createIndex(employees, IndexConfig("deltaIndex", indexedColumns = Seq("name"), includedColumns = Seq("salary")))

4.2 Hyperspace日志

$ spark-2.4.2-bin-hadoop2.7 $ tree spark-warehouse
spark-warehouse
└── indexes
    └── deltaIndex
        ├── _hyperspace_log
        │   ├── 0
        │   ├── 1
        │   └── latestStable
        └── v__=0
            ├── _SUCCESS
            ├── part-00071-46fed65d-a011-4d69-9267-30c40c623a78_00071.c000.snappy.parquet
            ├── part-00164-46fed65d-a011-4d69-9267-30c40c623a78_00164.c000.snappy.parquet
            ├── part-00165-46fed65d-a011-4d69-9267-30c40c623a78_00165.c000.snappy.parquet
            └── part-00169-46fed65d-a011-4d69-9267-30c40c623a78_00169.c000.snappy.parquet

4.3 索引的Explain

scala> val query = employees.filter(employees("name") === "Andy").select("salary")
query: org.apache.spark.sql.DataFrame = [salary: bigint]

scala> query.explain
== Physical Plan ==
*(1) Project [salary#325L]
+- *(1) Filter (isnotnull(name#324) && (name#324 = Andy))
   +- *(1) FileScan parquet [name#324,salary#325L] Batched: true, Format: Parquet, Location: TahoeLogFileIndex[file:/tmp/delta-table/employees], PartitionCount: 4, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Andy)], ReadSchema: struct<name:string>

scala> query.show
+------+
|salary|
+------+
|  4500|
+------+


scala> spark.enableHyperspace
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@ac52e35

scala> query.explain
== Physical Plan ==
*(1) Project [salary#325L]
+- *(1) Filter (isnotnull(name#324) && (name#324 = Andy))
   +- *(1) FileScan Hyperspace(Type: CI, Name: deltaIndex, LogVersion: 1) [name#324,salary#325L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/daichen/Software/spark-2.4.2-bin-hadoop2.7/spark-warehouse/indexes/..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Andy)], ReadSchema: struct<name:string,salary:bigint>

scala> query.show
+------+
|salary|
+------+
|  4500|
+------+

除了Spark DataFrame的Explain,Hyperspace自己的Explain提供了更多细节:

scala> hyperspace.explain(query, verbose = true)
=============================================================
Plan with indexes:
=============================================================
Project [salary#325L]
+- Filter (isnotnull(name#324) && (name#324 = Andy))
   <----+- FileScan Hyperspace(Type: CI, Name: deltaIndex, LogVersion: 1) [name#324,salary#325L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/daichen/Software/spark-2.4.2-bin-hadoop2.7/spark-warehouse/indexes/..., PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Andy)], ReadSchema: struct<name:string,salary:bigint>---->

=============================================================
Plan without indexes:
=============================================================
Project [salary#325L]
+- Filter (isnotnull(name#324) && (name#324 = Andy))
   <----+- FileScan parquet [name#324,salary#325L] Batched: true, Format: Parquet, Location: TahoeLogFileIndex[file:/tmp/delta-table/employees], PartitionCount: 4, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Andy)], ReadSchema: struct<name:string>---->

=============================================================
Indexes used:
=============================================================
deltaIndex:file:/Users/daichen/Software/spark-2.4.2-bin-hadoop2.7/spark-warehouse/indexes/deltaIndex/v__=0

=============================================================
Physical operator stats:
=============================================================
+-----------------------------------------------------------+-------------------+------------------+----------+
|                                          Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+-----------------------------------------------------------+-------------------+------------------+----------+
|*Scan Hyperspace(Type: CI, Name: deltaIndex, LogVersion: 1)|                  0|                 1|         1|
|                                              *Scan parquet|                  1|                 0|        -1|
|                                                     Filter|                  1|                 1Hyperspace初体验:Delta Lake表索引

Hyperspace初体验:Delta Lake表索引

Pyspark Delta Lake 捕获表不是 delta 表异常

深入剖析 Delta Lake: MySQL CDC 实战

深入剖析 Delta Lake: MySQL CDC 实战

Delta Lake:如何在下一个版本的 delta 表中不携带已删除的记录?