如何使用 Pyspark 创建列表 json?

Posted

技术标签:

【中文标题】如何使用 Pyspark 创建列表 json?【英文标题】:How to create a list json using Pyspark? 【发布时间】:2019-01-09 11:50:54 【问题描述】:

我正在尝试使用 Pyspark 创建一个具有以下结构的 json 文件。

目标输出:

[
    "Loaded_data": [
        "Loaded_numeric_columns": ["id", "val"],
        "Loaded_category_columns": ["name", "branch"]
    ],
    "enriched_data": [
        "enriched_category_columns": ["country__4"],
        "enriched_index_columns": ["id__1", "val__3"]
    ]
]

我可以为每个部分创建列表。请参考下面的代码。我有点卡在这里,请您帮忙。

样本数据:

input_data=spark.read.csv("/tmp/test234.csv",header=True, inferSchema=True)
def is_numeric(data_type):
    return data_type not in ('date', 'string', 'boolean')
def is_nonnumeric(data_type):
    return data_type in ('string')

sub="__"
Loaded_numeric_columns = [name for name, data_type in input_data.dtypes if is_numeric(data_type) and (sub not in name)]
print Loaded_numeric_columns
Loaded_category_columns = [name for name, data_type in input_data.dtypes if is_nonnumeric(data_type) and (sub not in name)]
print Loaded_category_columns
enriched_category_columns = [name for name, data_type in input_data.dtypes if is_nonnumeric(data_type) and (sub in name)]
print enriched_category_columns
enriched_index_columns = [name for name, data_type in input_data.dtypes if is_numeric(data_type) and (sub in name)]
print enriched_index_columns

【问题讨论】:

您可以使用您的示例数据输出所需的 json 文件吗? 【参考方案1】:

您可以使用 structarray 创建新的列类型:

from pyspark.sql import functions as F

df.show()

+---+-----+-------+------+----------+-----+-------+
| id|  val|   name|branch|country__4|id__1| val__3|
+---+-----+-------+------+----------+-----+-------+
|  1|67.87|Shankar|     a|         1|67.87|Shankar|
+---+-----+-------+------+----------+-----+-------+



df.select(
  F.struct(
    F.array(F.col("id"), F.col("val")).alias("Loaded_numeric_columns"),
    F.array(F.col("name"), F.col("branch")).alias("Loaded_category_columns"),
  ).alias("Loaded_data"),
  F.struct(
    F.array(F.col("country__4")).alias("enriched_category_columns"),
    F.array(F.col("id__1"), F.col("val__3")).alias("enriched_index_columns"),
  ).alias("enriched_data"),
).printSchema()

root
 |-- Loaded_data: struct (nullable = false)
 |    |-- Loaded_numeric_columns: array (nullable = false)
 |    |    |-- element: double (containsNull = true)
 |    |-- Loaded_category_columns: array (nullable = false)
 |    |    |-- element: string (containsNull = true)
 |-- enriched_data: struct (nullable = false)
 |    |-- enriched_category_columns: array (nullable = false)
 |    |    |-- element: long (containsNull = true)
 |    |-- enriched_index_columns: array (nullable = false)
 |    |    |-- element: string (containsNull = true)

【讨论】:

以上是关于如何使用 Pyspark 创建列表 json?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 json 对象列表转换为单个 pyspark 数据框?

如何使用 pySpark 使多个 json 处理更快?

Pyspark 将 json 数组转换为数据帧行

如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框

如何使方法 JSON 可序列化以在自定义 Pyspark 转换器中使用

如何在不使用for循环的情况下从pyspark中的列表创建数据框?