PySpark:将 SchemaRDD 映射到 SchemaRDD

Posted

技术标签:

【中文标题】PySpark:将 SchemaRDD 映射到 SchemaRDD【英文标题】:PySpark: Map a SchemaRDD into a SchemaRDD 【发布时间】:2015-07-20 16:08:02 【问题描述】:

我正在将 JSON 对象文件加载为 PySpark SchemaRDD。我想改变对象的“形状”(基本上,我正在展平它们),然后插入到 Hive 表中。

我遇到的问题是以下返回 PipelinedRDD 而不是 SchemaRDD

log_json.map(flatten_function)

(其中log_jsonSchemaRDD)。

有没有办法保留类型、转换回所需类型或有效地从新类型插入?

【问题讨论】:

你能提供一些细节吗?您提到了SchemaRDD,所以我猜它是 Spark @zero323 输出是平的;输入不是。我们有火花 1.2。 【参考方案1】:

与其说是真正的解决方案,不如说是一个想法。假设您的数据如下所示:

data = [
    "foobar":
        "foo": 1, "bar": 2, "fozbaz": 
            "foz": 0, "baz": "b": -1, "a": -1, "z": -1
        ]

import json 
with open("foobar.json", "w") as fw:
    for record in data:
        fw.write(json.dumps(record))

首先让我们加载它并检查架构:

>>> srdd = sqlContext.jsonFile("foobar.json")
>>> srdd.printSchema()
root
 |-- foobar: struct (nullable = true)
 |    |-- bar: integer (nullable = true)
 |    |-- foo: integer (nullable = true)
 |    |-- fozbaz: struct (nullable = true)
 |    |    |-- baz: struct (nullable = true)
 |    |    |    |-- a: integer (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- z: integer (nullable = true)
 |    |    |-- foz: integer (nullable = true)

现在我们按照Justin Pihony 的建议注册表并提取模式:

srdd.registerTempTable("srdd")
schema = srdd.schema().jsonValue()

我们可以使用类似下面的方法来扁平化模式,而不是扁平化数据:

def flatten_schema(schema):
    """Take schema as returned from schema().jsonValue()
    and return list of field names with full path"""
    def _flatten(schema, path="", accum=None):
        # Extract name of the current element
        name = schema.get("name")
        # If there is a name extend path
        if name is not None:
            path = "0.1".format(path, name) if path else name
        # It is some kind of struct
        if isinstance(schema.get("fields"), list):
            for field in schema.get("fields"):
                _flatten(field, path, accum)
        elif isinstance(schema.get("type"), dict):
            _flatten(schema.get("type"), path, accum)
        # It is an atomic type
        else:
            accum.append(path)
    accum = []
    _flatten(schema, "", accum)
    return  accum

添加小助手来格式化查询字符串:

def build_query(schema, df):
    select = ", ".join(
            "0 AS 1".format(field, field.replace(".", "_"))
            for field in flatten_schema(schema))
    return "SELECT 0 FROM 1".format(select, df)

最后的结果:

>>> sqlContext.sql(build_query(schema, "srdd")).printSchema()
root
 |-- foobar_bar: integer (nullable = true)
 |-- foobar_foo: integer (nullable = true)
 |-- foobar_fozbaz_baz_a: integer (nullable = true)
 |-- foobar_fozbaz_baz_b: integer (nullable = true)
 |-- foobar_fozbaz_baz_z: integer (nullable = true)
 |-- foobar_fozbaz_foz: integer (nullable = true)

免责声明:我没有尝试深入研究架构结构,所以很可能有些情况没有被flatten_schema 涵盖。

【讨论】:

我的问题不是如何展平架构。我的问题是如何将我的 RDD 保持为 SchemaRDD。 我理解并且我相信它实际上可以解决问题,而无需手动指定架构。输出是平面的,类型被保留并且模式已经应用。【参考方案2】:

看起来select在python中不可用,所以你必须registerTempTable并将其写成SQL语句,比如

`SELECT flatten(*) FROM TABLE`

在SQL中设置函数后

sqlCtx.registerFunction("flatten", lambda x: flatten_function(x))

@zero323 提出,可能不支持针对 * 的函数...so you can just create a function that takes in your data types and pass all of that in.

【讨论】:

我很确定在 * 上调用 udf 是不允许的。你有任何工作的例子吗?关于选择它从 1.3 开始可用。【参考方案3】:

解决办法是applySchema:

mapped = log_json.map(flatten_function)
hive_context.applySchema(mapped, flat_schema).insertInto(name)

其中 flat_schema 是一个 StructType 代表架构,其方式与您从 log_json.schema() 获得的方式相同(但显然是扁平化的)。

【讨论】:

【参考方案4】:

你可以试试这个...有点长但有效

def flat_table(df,table_name):
def rec(l,in_array,name):
    for i,v in enumerate(l):
        if isinstance(v['type'],dict):
            if 'fields' in v['type'].keys():
                rec(name=name+[v['name']],l=v['type']['fields'],in_array=False)
            if 'elementType' in v['type'].keys():
                rec(name=name+[v['name']],l=v['type']['elementType']['fields'],in_array=True)
        else:#recursia stop rule
            #if this is an array so we need to explode every element in the array
            if in_array:
                field_list.append('nodesubnode.array'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
            else:
                field_list.append('nodesubnode'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))

   # table_name='x'
   field_list=[]
   l=df.schema.jsonValue()['fields']
   df.registerTempTable(table_name)
   rec(l,in_array=False,name=[table_name])

   #create the select satement

   inner_fileds=[]
   outer_fields=[]
   flag=True

   for x in field_list:
      f=x.split('.')
      if f[-1]<>'array':
        inner_fileds.append('field as name'.format(field=".".join(f),name=f[-1]))
        of=['a']+f[-1:]

        outer_fields.append('field as name'.format(field=".".join(of),name=of[-1]))
    else:
        if flag:#add the array to the inner query for expotion only once for every array field
            inner_fileds.append('explode(field) as name'.format(field=".".join(f[:-2]),name=f[-3]))
            flag=False

        of=['a']+f[-3:-1]
        outer_fields.append('field as name'.format(field=".".join(of),name=of[-1]))


   q="""select outer_fields
        from (select inner_fileds
        from table_name)      a""".format(outer_fields=',\n'.join(outer_fields),inner_fileds=',\n'.join(inner_fileds),table_name=table_name)
   return q

【讨论】:

以上是关于PySpark:将 SchemaRDD 映射到 SchemaRDD的主要内容,如果未能解决你的问题,请参考以下文章

将函数映射到 pyspark 数据框的多列

如何将字典中的值映射到 Pyspark 中的新列

如何使用 Pyspark 将一个 rdd 映射到另一个?

如何将每一列映射到pyspark数据框中的其他列?

如何将从逻辑回归模型获得的系数映射到pyspark中的特征名称

PySpark 将 Dataframe 作为额外参数传递给映射