PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

Posted

技术标签:

【中文标题】PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?【英文标题】:PySpark DataFrames - way to enumerate without converting to Pandas? 【发布时间】:2015-09-24 12:06:15 【问题描述】:

我有一个非常大的 pyspark.sql.dataframe.DataFrame,名为 df。 我需要某种枚举记录的方法——因此,能够访问具有特定索引的记录。 (或选择具有索引范围的记录组)

在熊猫中,我可以只做

indexes=[2,3,6,7] 
df[indexes]

这里我想要类似的东西,(并且没有将数据框转换为熊猫)

我能到达的最近的是:

通过以下方式枚举原始数据框中的所有对象:

indexes=np.arange(df.count())
df_indexed=df.withColumn('index', indexes)
使用 where() 函数搜索我需要的值。

问题:

    为什么它不起作用以及如何使它起作用?如何向数据框添加一行?

    以后可以做类似的东西吗:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    

    有没有更快更简单的处理方法?

【问题讨论】:

【参考方案1】:

它不起作用,因为:

    withColumn 的第二个参数应该是 Column 而不是集合。 np.array 在这里不起作用 当您将 "index in indexes" 作为 SQL 表达式传递给 where 时,indexes 超出范围且未解析为有效标识符

PySpark >= 1.4.0

您可以使用相应的窗口函数添加行号,并使用Column.isin 方法或格式正确的查询字符串进行查询:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in (0)".format(",".join(str(x) for x in indexes)))

看起来像在没有PARTITION BY 子句的情况下调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决方案。

有没有更快更简单的处理方法?

不是真的。 Spark DataFrames 不支持随机行访问。

PairedRDD 可以使用lookup 方法访问,如果使用HashPartitioner 对数据进行分区,则该方法相对较快。还有一个支持高效查找的indexed-rdd 项目。

编辑

独立于 PySpark 版本,您可以尝试以下操作:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## +----+
## |char|
## +----+
## |   a|
## |   b|
## |   c|
## |   d|
## |   e|
## +----+
## only showing top 5 rows

# This part is not tested but should work and save some work later
schema  = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
    .zipWithIndex() # Add index
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
    .toDF(schema)) # It will work without schema but will be more expensive

# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))

【讨论】:

你好@zero323,我试过sn-p。一切正常,除了 indexed.where(col("index").inSet(indexes)) 不起作用。它为我返回TypeError: 'Column' object is not callable。如果我想查询多个索引,你有关于 sn-p 的更新吗?【参考方案2】:

如果您想要一个保证不会发生冲突但不需要.over(partitionBy()) 的数字范围,那么您可以使用monotonicallyIncreasingId()

from pyspark.sql.functions import monotonicallyIncreasingId
df.select(monotonicallyIncreasingId().alias("rowId"),"*")

请注意,这些值并不是特别“整洁”。每个分区都有一个值范围,输出不会是连续的。例如。 0, 1, 2, 8589934592, 8589934593, 8589934594

这是 2015 年 4 月 28 日添加到 Spark 的:https://github.com/apache/spark/commit/d94cd1a733d5715792e6c4eac87f0d5c81aebbe2

【讨论】:

【参考方案3】:

您当然可以添加一个用于索引的数组,一个您选择的数组: 在 Scala 中,首先我们需要创建一个索引数组:

val index_array=(1 to df.count.toInt).toArray

index_array: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

您现在可以将此列附加到您的 DF。首先,为此,您需要打开我们的 DF 并将其作为一个数组获取,然后将其与您的 index_array 一起压缩,然后我们将新数组转换回 RDD。最后一步是将其作为 DF:

final_df = sc.parallelize((df.collect.map(
    x=>(x(0),x(1))) zip index_array).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))).
    toDF("column_name")

之后索引会更清晰。

【讨论】:

【参考方案4】:
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("Atr4", monotonically_increasing_id())

如果您只需要增量值(如 ID)并且没有 限制数字必须是连续的,你可以使用 monotonically_increasing_id()。使用时的唯一保证 功能是每行的值都会增加,但是, 每次执行时,值本身可能会有所不同。

【讨论】:

【参考方案5】:

monotonicallyIncreasingId() - 这将按递增顺序而不是按顺序分配行号。

2 列的示例输出:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 12 | xz | |---------------------|------------------|

如果您想分配行号,请使用以下技巧。

在 spark-2.0.1 和更高版本中测试。

df.createOrReplaceTempView("df") dfRowId = spark.sql("select *, row_number() over (partition by 0) as rowNo from df")

2 列的示例输出:

|---------------------|------------------| | RowNo | Heading 2 | |---------------------|------------------| | 1 | xy | |---------------------|------------------| | 2 | xz | |---------------------|------------------|

希望这会有所帮助。

【讨论】:

【参考方案6】:

选择 Pyspark DataFrame 的单行 n,尝试:

df.where(df.id == n).show()

给定一个 Pyspark 数据帧:

df = spark.createDataFrame([(1, 143.5, 5.6, 28, 'M', 100000),\
                          (2, 167.2, 5.4, 45, 'M', None),\
                          (3, None , 5.2, None, None, None),\
                          ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

选择第三行,试试:

df.where('id == 3').show()

或者:

df.where(df.id == 3).show()

选择具有行 ID 的多行(在本例中为第 2 行和第 3 行),尝试:

id = "2", "3"
df.where(df.id.isin(id)).show()

【讨论】:

这个答案被否决了,因为它没有解释如何创建 id 列。

以上是关于PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark(Dataframes)逐行读取文件(将行转换为字符串)

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

如何使用 Pyspark 和 Dataframes 查询 Elasticsearch 索引

Pyspark DataFrames 中的嵌套 SELECT 查询

PySpark:具有不同列的 DataFrames 的动态联合

使用 Pyspark / Dataframes 时,如何将谓词下推到 Cassandra 或限制请求的数据?