Spark 上的 SQL:如何获取 DISTINCT 的所有值?

Posted

技术标签:

【中文标题】Spark 上的 SQL:如何获取 DISTINCT 的所有值?【英文标题】:SQL on Spark: How do I get all values of DISTINCT? 【发布时间】:2016-03-20 17:45:20 【问题描述】:

所以,假设我有下表:

Name | Color
------------------------------
John | Blue
Greg | Red
John | Yellow
Greg | Red
Greg | Blue

我想为每个名称获取不同颜色的表格 - 有多少及其值。意思是这样的:

Name | Distinct | Values
--------------------------------------
John |   2      | Blue, Yellow
Greg |   2      | Red, Blue

有什么办法吗?

【问题讨论】:

这在您的 RDBMS 之后从 simplevery simple 有所不同,这是您错误地未指定的。所以请编辑您的答案并添加 RDBMS @ThomasG 公平地说是指定平台,它不是RDBMS。 In PySpark 1.5.0, how do you list all items of column `y` based on the values of column `x`?的可能重复 @zero323 这个问题正在寻找聚合作为 DISTINCT 集,而不是重复的列表。 【参考方案1】:

collect_list 将为您提供一个列表,而不会删除重复项。 collect_set 将自动删除重复项 所以就

select 
Name,
count(distinct color) as Distinct, # not a very good name
collect_set(Color) as Values
from TblName
group by Name

这个功能是从 spark 1.6.0 开始实现的,看看吧:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala

/**
   * Aggregate function: returns a set of objects with duplicate elements eliminated.
   *
   * For now this is an alias for the collect_set Hive UDAF.
   *
   * @group agg_funcs
   * @since 1.6.0
   */
  def collect_set(columnName: String): Column = collect_set(Column(columnName))

【讨论】:

dataframe api 类似:data.groupBy("name").agg($"name", countDistinct("color").as("uniq_color_count"), collect_list("color") .as("颜色"))【参考方案2】:

对于 PySpark;我来自 R/Pandas 背景,所以我实际上发现 Spark Dataframes 更容易使用。

为此:

    设置 Spark SQL 上下文 将文件读入数据框 将您的数据框注册为临时表 直接使用SQL语法查询 将结果保存为对象,输出到文件..做你的事

这是我为此创建的一个类:

class SQLspark():

def __init__(self, local_dir='./', hdfs_dir='/users/', master='local', appname='spark_app', spark_mem=2):
    self.local_dir = local_dir
    self.hdfs_dir = hdfs_dir
    self.master = master
    self.appname = appname
    self.spark_mem = int(spark_mem)
    self.conf = (SparkConf()
           .setMaster(self.master)
           .setAppName(self.appname)
           .set("spark.executor.memory", self.spark_mem))
    self.sc = SparkContext(conf=self.conf)
    self.sqlContext = SQLContext(self.sc)


def file_to_df(self, input_file):
    # import file as dataframe, all cols will be imported as strings
    df = self.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").option("inferSchema", "true").load(input_file)
    # # cache df object to avoid rebuilding each time
    df.cache()
    # register as temp table for querying, use 'spark_df' as table name
    df.registerTempTable("spark_df")
    return df

# you also cast a spark dataframe as a pandas df
def sparkDf_to_pandasDf(self, input_df):
    pandas_df = input_df.toPandas()
    return pandas_df

def find_distinct(self, col_name):
    my_query = self.sqlContext.sql("""SELECT distinct  FROM spark_df""".format(col_name))
   # now do your thing with the results etc
    my_query.show() 
    my_query.count()
    my_query.collect()

###############
if __name__ == '__main__':

# instantiate class 
# see function for variables to input
spark = TestETL(os.getcwd(), 'hdfs_loc', "local", "etl_test", 10)


# specify input file to process
tsv_infile = 'path/to/file'

【讨论】:

以上是关于Spark 上的 SQL:如何获取 DISTINCT 的所有值?的主要内容,如果未能解决你的问题,请参考以下文章

sql distinc con conchasoloaño

如何优化大数据框上的 spark sql 操作?

如何通过 Spark SQL 作为 JDBC 分布式查询引擎访问 RDD 表?

spark算子

Spark上的Hive如何从jdbc读取数据?

如何从 T-SQL 表中提取最新值