PySpark 中的二元计数

Posted

技术标签:

【中文标题】PySpark 中的二元计数【英文标题】:Bigram counting in PySpark 【发布时间】:2018-05-28 19:37:30 【问题描述】:

我正在尝试在 PySpark 中拼凑一个二元计数程序,该程序接受一个文本文件并输出每个正确二元的频率(一个句子中的两个连续单词)。

from pyspark.ml.feature import NGram

with use_spark_session("Bigrams") as spark:
    text_file = spark.sparkContext.textFile(text_path)
    sentences = text_file.flatMap(lambda line: line.split(".")) \
                        .filter(lambda line: len(line) > 0) \
                        .map(lambda line: (0, line.strip().split(" ")))  
    sentences_df = sentences.toDF(schema=["id", "words"])    
    ngram_df = NGram(n=2, inputCol="words", outputCol="bigrams").transform(sentences_df)

ngram_df.select("bigrams") 现在包含:

+--------------------+
|             bigrams|
+--------------------+
|[April is, is the...|
|[It is, is one, o...|
|[April always, al...|
|[April always, al...|
|[April's flowers,...|
|[Its birthstone, ...|
|[The meaning, mea...|
|[April comes, com...|
|[It also, also co...|
|[April begins, be...|
|[April ends, ends...|
|[In common, commo...|
|[In common, commo...|
|[In common, commo...|
|[In years, years ...|
|[In years, years ...|
+--------------------+

所以每个句子都有一个二元组列表。现在需要计算不同的二元组。如何?此外,整个代码看起来仍然过于冗长,所以我很乐意看到更简洁的解决方案。

【问题讨论】:

【参考方案1】:

如果您已经使用 RDD API,您可以继续跟进

bigrams = text_file.flatMap(lambda line: line.split(".")) \
                   .map(lambda line: line.strip().split(" ")) \
                   .flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:])))

bigrams.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

否则:

from pyspark.sql.functions import explode

ngram_df.select(explode("bigrams").alias("bigram")).groupBy("bigram").count()

【讨论】:

以上是关于PySpark 中的二元计数的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中调整 GBTClassifier 的阈值

如何在 Pyspark 中以编程方式使用“计数”?

将每一行的值汇总为布尔值(PySpark)

PySpark 计数 groupby 与 None 键

Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)

滚动计数器 24 小时时间戳 - pyspark