使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件
Posted
技术标签:
【中文标题】使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件【英文标题】:Group by column and write each group of strings to text file using Apache Spark and Java 【发布时间】:2019-11-01 21:09:34 【问题描述】:我有一个 .csv 文件,其中包含 id 列和几个字符串列。我想按 id 分组,然后将 string_column1 中的所有值写入文本文件(每个值在新行上)。最后,我希望文本文件的名称为“allstrings”+id。 我正在使用带有 Java 的 Apache Spark。
我尝试使用 groupBy("id").agg(collect_list("string_column1")) 但我得到“方法 collect_list(String) 对于 Main 类型未定义”。 我不知道如何使用 id 列中的不同值来命名文本文件。
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main
public static void main(String[] args)
Logger.getLogger("org.apache").setLevel(Level.WARN);
SparkSession spark = SparkSession.builder()
.appName("testingSql")
.master("local[*]")
.getOrCreate();
Dataset<Row> dataset = spark.read()
.option("header", true)
.csv("src/main/resources/maininput.csv");
// make a separate .csv file for each group of strings (grouped by id),
// with each string on a new line
// and the name of the file should be "allstrings"+id
RelationalGroupedDataset result = dataset.groupBy("id")
.agg(collect_list("string_column1"))
.?????????;
spark.close();
【问题讨论】:
【参考方案1】:您可以在写入时对数据进行分区,它将为每个组创建单独的目录id
每个文件夹的名称将采用 column_name=value 格式。
df.write.partitionBy("id").csv("output_directory")
然后您可以使用org.apache.hadoop.fs._
重命名每个组目录中的文件。
【讨论】:
谢谢。知道为什么我在使用 groupBy("id").agg(collect_list("string_column1")) 时得到“方法 collect_list(String) 对于 Main 类型未定义”吗?这是scala方法吗?我可以在 Java 中使用什么? 您应该导入 sql.functions 以使用 collect_list 或做类似的事情:.agg(org.apache.spark.sql.functions.collect_list("string_column1"))
您好,我正在使用这种方法,速度很慢。有没有其他办法?以上是关于使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet
使用 Apache Spark SQL 和 Java 直接运行 sql 查询