使用 Impala 在 INSERT INTO (Parquet) TABLE 期间对分区键进行排序

Posted

技术标签:

【中文标题】使用 Impala 在 INSERT INTO (Parquet) TABLE 期间对分区键进行排序【英文标题】:Sorting on partition keys during INSERT INTO (Parquet) TABLE with Impala 【发布时间】:2019-02-20 09:47:28 【问题描述】:

我有一个 ETL 作业,我想将 .csv 文件中的数据附加到 Impala 表中。目前,我通过使用新数据(.csv.lzo 格式)创建一个临时的外部 .csv 表来执行此操作,然后将其插入到主表中。

我使用的查询如下所示:

INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT) AS yr,
    CAST(extract(ts, "month") AS TINYINT) AS mth
FROM csv_table

其中main_table 定义如下(有几列被截断):

CREATE TABLE IF NOT EXISTS main_table (
    tid             INT,
    s1              VARCHAR,
    s2              VARCHAR,
    status          TINYINT,
    ts              TIMESTAMP,
    n1              DOUBLE,
    n2              DOUBLE,
    p               DECIMAL(3,2),
    mins            SMALLINT,
    temp            DOUBLE
)
PARTITIONED BY (yr SMALLINT, mth TINYINT)
STORED AS PARQUET

数据大约为几 GB(5500 万行,大约 30 列),这需要一个多小时才能运行。我很好奇为什么会出现这种情况(因为这对于本质上是追加操作的东西来说似乎相当长),并且在查询计划中遇到了这个:

F01:PLAN FRAGMENT [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))] hosts=2 instances=2
|  Per-Host Resources: mem-estimate=1.01GB mem-reservation=12.00MB thread-reservation=1
WRITE TO HDFS [default.main_table, OVERWRITE=false, PARTITION-KEYS=(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  partitions=unavailable
|  mem-estimate=1.00GB mem-reservation=0B thread-reservation=0
|
02:SORT
|  order by: CAST(extract(ts, 'year') AS SMALLINT) ASC NULLS LAST, CAST(extract(ts, 'month') AS TINYINT) ASC NULLS LAST
|  materialized: CAST(extract(ts, 'year') AS SMALLINT), CAST(extract(ts, 'month') AS TINYINT)
|  mem-estimate=12.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
|  tuple-ids=1 row-size=1.29KB cardinality=unavailable
|  in pipelines: 02(GETNEXT), 00(OPEN)
|
01:EXCHANGE [HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT))]
|  mem-estimate=2.57MB mem-reservation=0B thread-reservation=0
|  tuple-ids=0 row-size=1.28KB cardinality=unavailable
|  in pipelines: 00(GETNEXT)
|

显然,大部分时间和资源都花在对分区键进行排序上:

Operator       #Hosts  Avg Time  Max Time   #Rows  Est. #Rows  Peak Mem  Est. Peak Mem  Detail                                                                                          
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
02:SORT             2    17m16s    30m50s  55.05M          -1  25.60 GB       12.00 MB                                                                                                  
01:EXCHANGE         2   9s493ms  12s822ms  55.05M          -1  26.98 MB        2.90 MB  HASH(CAST(extract(ts, 'year') AS SMALLINT),CAST(extract(ts, 'month') AS TINYINT)) 
00:SCAN HDFS        2  51s958ms     1m10s  55.05M          -1  76.06 MB      704.00 MB  default.csv_table

Impala 为什么必须这样做?有什么方法可以对表进行分区而不必对分区键进行排序,或者在我的情况下加快它的速度,我试图附加的整个 .csv 文件只有 1 个或 2 个分区键?

编辑:事实证明,这很可能是因为我使用的是 Parquet 文件格式。我的问题仍然适用:当我知道实际上几乎不需要排序时,有没有办法加快排序?

相比之下,像 SELECT COUNT(*) FROM csv_table WHERE extract(ts, "year") = 2018 AND extract(ts, "month") = 1 这样的操作大约需要 2-3 分钟,而 ORDER BY(在插入过程中完成)需要一个多小时。此示例只有键 (2018,1) 和 (2018,2)。

【问题讨论】:

可以添加目标表的表定义吗? 感谢您的建议,我已添加它并意识到排序是因为我将表格存储为镶木地板文件格式,如下所述:blog.cloudera.com/blog/2017/12/…。不过,我仍然想知道如何加快速度。 您看到创建分区和集群表的行为是否相同? 我没有尝试过,但据我所知,Impala 没有使用聚簇表。此外,我之前评论中的博客文章明确指出,由于 parquet 格式,数据正在被排序。我希望在某个地方找到一个解决方案,告诉 Impala 不需要进行排序(也许一次插入一个分区) 这是正确的,但这是因为您的密钥分配(用于分区)高度集中在 1 个分区中。如果您强制集群添加不同的列,我认为这将提高性能 【参考方案1】:

您可以添加提示以禁用排序阶段。

INSERT INTO TABLE main_table
PARTITION(yr, mth)  /* +NOCLUSTERED */
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT) AS yr,
    CAST(extract(ts, "month") AS TINYINT) AS mth
FROM csv_table

在这里解释:Optimizer Hints

/* +CLUSTERED / 和 / +NOCLUSTERED / 提示 / +CLUSTERED / 排序 数据在插入前由分区列保证只有一个 每个节点一次写入分区。使用此提示可减少 保持打开的文件数和保存在内存中的缓冲区数 同时地。该技术主要用于插入 Parquet 表,其中大块大小需要大量内存 一次为多个输出文件缓冲数据。这个提示是 在 Impala 2.8 或更高版本中可用。从 Impala 3.0 开始,/ +CLUSTERED */ 是 HDFS 表的默认行为。

/* +NOCLUSTERED */ 在插入前不按主键排序。这 提示在 Impala 2.8 或更高版本中可用。使用此提示时 插入 Kudu 表。

在低于 Impala 3.0 的版本中, /* +NOCLUSTERED */ 是 HDFS 表中的默认值。

【讨论】:

【参考方案2】:

Impala 进行排序是因为您使用动态分区。特别是对于具有计算统计数据的表,impala 不太擅长动态分区。我建议您在动态分区的情况下使用 hive。如果您不打算使用 hive,我的建议是:

    在每次插入语句之前计算 csv 表的统计信息。 如果第一步效果不好,对几个可能的分区使用静态分区,并运行超出可能范围的动态分区。例如;如果年份和月份只有一个选项:
INSERT 
INTO TABLE main_table
PARTITION(yr=2019, mth=2)
SELECT
    *
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)=2019 and CAST(extract(ts, "month") AS TINYINT)=2;  
INSERT INTO TABLE main_table
PARTITION(yr, mth)
SELECT
    *,
    CAST(extract(ts, "year") AS SMALLINT),
    CAST(extract(ts, "month") AS TINYINT)
FROM csv_table where CAST(extract(ts, "year") AS SMALLINT)!=2019 and CAST(extract(ts, "month") AS TINYINT)!=2;

这些语句缩小了动态分区将处理的集合。并且预计会减少花费的总时间。

【讨论】:

我会尝试(1);希望 Impala 能够意识到不需要排序。我确实尝试过(2),但这并没有改善任何东西(表格仍然得到排序,并且由于排序是在对数线性时间内完成的,所以排序所花费的时间或多或少是相同的)。谢谢!

以上是关于使用 Impala 在 INSERT INTO (Parquet) TABLE 期间对分区键进行排序的主要内容,如果未能解决你的问题,请参考以下文章

insert into linksvr select VS insert into from linksvr

MySQL中insert into语句的6种写法

SELECT INTO 和 INSERT INTO SELECT比较

insert into 语句出现错误,不知应该怎么将参数传入数据库?

Replace INTO和INSERT INTO的区别

insert into 插入数据问题