在 Hadoop UDF 输出中保留列数据类型(流)

Posted

技术标签:

【中文标题】在 Hadoop UDF 输出中保留列数据类型(流)【英文标题】:Preserving column data types in Hadoop UDF output (Streaming) 【发布时间】:2014-06-12 01:05:36 【问题描述】:

我正在用 Python 为 Hadoop 上的 Hive 查询编写 UDF。我的表有几个bigint 字段和几个string 字段。

我的 UDF 修改了 bigint 字段,将修改后的版本减去到一个新列(也应该是数字),并保持 string 字段不变。

当我在查询中运行我的 UDF 时,结果都是 string 列。

如何在我的 UDF 输出中保留或指定类型?


更多细节:

我的 Python UDF:

import sys
for line in sys.stdin:
    # pre-process row
    line = line.strip()
    inputs = line.split('\t')

    # modify numeric fields, calculate new field
    inputs[0], inputs[1], new_field = process(int(inputs[0]), int(inputs[1]))

    # leave rest of inputs as is; they are string fields.

    # output row
    outputs = [new_field]
    outputs.extend(inputs)
    print '\t'.join([str(i) for i in outputs]) # doesn't preserve types!

我将此 UDF 保存为 myudf.py 并将其添加到 Hive。

我的 Hive 查询:

CREATE TABLE calculated_tbl AS
SELECT TRANSFORM(bigintfield1, bigintfield2, stringfield1, stringfield2)
USING 'python myudf.py'
AS (calculated_int, modified_bif1, modified_bif2, stringfield1, stringfield2)
FROM original_tbl;

【问题讨论】:

【参考方案1】:

流式传输通过标准输出发送所有内容。它实际上只是引擎盖下的 hadoop 流之上的一个包装器。所有类型都转换为字符串,您在 python udf 中进行相应处理,然后作为字符串返回到 hive。 hive 中的 python 转换除了字符串之外永远不会返回任何内容。您可以尝试在子查询中进行转换,然后将结果转换为类型:

 SELECT cast(calculated_int as bigint)
        ,cast( modified_bif1 as bigint)
        ,cast( modified_bif2 as bigint) 
        ,stringfield1 
        ,stringfield2
 FROM ( 
 SELECT TRANSFORM(bigintfield1, bigintfield2, stringfield1, stringfield2)
 USING 'python myudf.py'
 AS (calculated_int, modified_bif1, modified_bif2, stringfield1, stringfield2)
 FROM original_tbl) A ;

Hive 可能会让您侥幸逃脱,如果没有,您需要将结果保存到表中,然后您可以在另一个查询中转换(转换)为不同的类型。

最后一个选项是只使用 Java UDF。仅映射 UDF 还不错,它们允许您指定返回类型。

更新(来自提问者):

上面的答案非常有效。几周后,我在阅读 O'Reilly 的“Programming Hive”一书中发现了一个更优雅的解决方案:

CREATE TABLE calculated_tbl AS
SELECT TRANSFORM(bigintfield1, bigintfield2, stringfield1, stringfield2)
USING 'python myudf.py'
AS (calculated_int BIGINT, modified_bif1 BIGINT, modified_bif2 BIGINT, stringfield1 STRING, stringfield2 STRING)
FROM original_tbl;

您可以直接在 AS(...) 行中指定类型,而不是强制转换。

【讨论】:

以上是关于在 Hadoop UDF 输出中保留列数据类型(流)的主要内容,如果未能解决你的问题,请参考以下文章

使用 Hive UDF 解压列数据

hadoop 流确保每个 reducer 一个键

pyspark 在 udf 中获取结构数据类型的字段名称

更改 DataFrame 中的列数据类型并将其传递到 UDF - PySpark

如何在 Amazon EMR 上的 pig 中使用 Python 流 UDF

所有列的 Pyspark 数据框数据类型由 UDF 更改为 String