使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件

Posted

技术标签:

【中文标题】使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件【英文标题】:Group by column and write each group of strings to text file using Apache Spark and Java 【发布时间】:2019-11-01 21:09:34 【问题描述】:

我有一个 .csv 文件,其中包含 id 列和几个字符串列。我想按 id 分组,然后将 string_column1 中的所有值写入文本文件(每个值在新行上)。最后,我希望文本文件的名称为“allstrings”+id。 我正在使用带有 Java 的 Apache Spark。

我尝试使用 groupBy("id").agg(collect_list("string_column1")) 但我得到“方法 collect_list(String) 对于 Main 类型未定义”。 我不知道如何使用 id 列中的不同值来命名文本文件。

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Main 

    public static void main(String[] args) 
        Logger.getLogger("org.apache").setLevel(Level.WARN);

        SparkSession spark = SparkSession.builder()
                .appName("testingSql")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> dataset = spark.read()
        .option("header", true)
        .csv("src/main/resources/maininput.csv");

        // make a separate .csv file for each group of strings (grouped by id),
        // with each string on a new line
        // and the name of the file should be "allstrings"+id
        RelationalGroupedDataset result = dataset.groupBy("id")
                .agg(collect_list("string_column1"))
                .?????????;



        spark.close();
    


【问题讨论】:

【参考方案1】:

您可以在写入时对数据进行分区,它将为每个组创建单独的目录id 每个文件夹的名称将采用 column_name=value 格式。

df.write.partitionBy("id").csv("output_directory")

然后您可以使用org.apache.hadoop.fs._ 重命名每个组目录中的文件。

【讨论】:

谢谢。知道为什么我在使用 groupBy("id").agg(collect_list("string_column1")) 时得到“方法 collect_list(String) 对于 Main 类型未定义”吗?这是scala方法吗?我可以在 Java 中使用什么? 您应该导入 sql.functions 以使用 collect_list 或做类似的事情:.agg(org.apache.spark.sql.functions.collect_list("string_column1")) 您好,我正在使用这种方法,速度很慢。有没有其他办法?

以上是关于使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件的主要内容,如果未能解决你的问题,请参考以下文章

按列分组和排序csv文件spark [duplicate]

使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

使用 Apache Spark SQL 和 Java 直接运行 sql 查询

如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?

使用java的apache spark中的决策树实现问题

使用java开发spark的wordcount程序