使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

Posted

技术标签:

【中文标题】使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet【英文标题】:Parse CSV as DataFrame/DataSet with Apache Spark and Java 【发布时间】:2014-10-11 08:16:52 【问题描述】:

我是 spark 新手,我想使用 group-by 和 reduce 从 CSV 中找到以下内容(一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想通过 Department、Designation、State 分组简化 about CSV,并添加带有 sum(costToCompany)TotalEmployeeCount 的附加列

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法使用转换和动作来实现这一点。还是我们应该进行 RDD 操作?

【问题讨论】:

您能否整理一下 CSV 块(输入和结果),以便在标题和每一行之间清楚地分开?目前还不清楚一行的开始或结束位置。 查看way to do it with Spark 2.x + 【参考方案1】:

程序

创建一个类(架构)来封装您的结构(方法 B 不需要,但如果您使用 Java,它会使您的代码更易于阅读)

public class Record implements Serializable 
  String department;
  String designation;
  long costToCompany;
  String state;
  // constructor , getters and setters  

加载 CVS (JSON) 文件

JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path/input.csv");
//JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified


JavaRDD<Record> rdd_records = sc.textFile(data).map(
  new Function<String, Record>() 
      public Record call(String line) throws Exception 
         // Here you can use JSON
         // Gson gson = new Gson();
         // gson.fromJson(line, Record.class);
         String[] fields = line.split(",");
         Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
         return sd;
      
);

此时你有两种方法:

A. SparkSQL

注册一个表(使用您定义的架构类)

JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
table.registerAsTable("record_table");
table.printSchema();

使用所需的 Query-group-by 查询表

JavaSchemaRDD res = sqlContext.sql("
  select department,designation,state,sum(costToCompany),count(*) 
  from record_table 
  group by department,designation,state
");

在这里,您还可以使用 SQL 方法执行您想要的任何其他查询

B.火花

使用复合键映射:Department,Designation,State

JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
rdd_records.mapToPair(new
  PairFunction<Record, String, Tuple2<Long, Integer>>()
    public Tuple2<String, Tuple2<Long, Integer>> call(Record record)
      Tuple2<String, Tuple2<Long, Integer>> t2 = 
      new Tuple2<String, Tuple2<Long,Integer>>(
        record.Department + record.Designation + record.State,
        new Tuple2<Long, Integer>(record.costToCompany,1)
      );
      return t2;

);

reduceByKey使用复合键,对costToCompany列求和,按key累加记录数

JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
 records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
 Integer>, Tuple2<Long, Integer>>() 
    public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
    Tuple2<Long, Integer> v2) throws Exception 
        return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
    
);

【讨论】:

考虑到 B 方法也在使用 Record 类和加载步骤。我不确定错误指的是什么符号,您能否包含完整的错误跟踪?也可能是由于您的数据输入文件,您是否更改了输入中的某些内容? 嗨@emecas 感谢您的出色回答。我正在使用您的代码,出于某种原因,表是空的,没有架构。 JavaSchemaRDD 表 = sqlContext.applySchema(rdd_records, Record.class);当我保存 table.saveAsTextFile() 时,它会在部分文件的所有行中打印 []。 别忘了填写以下部分:(构造函数)、架构类(记录)上的 getter 和 setter .. 见 @user449355 Q/A ***.com/a/30103554/833336【参考方案2】:

可以使用 Spark 内置的 CSV 阅读器解析 CSV 文件。它会回来 成功读取文件时的 DataFrame/DataSet。在之上 DataFrame/DataSet,您可以轻松应用类似 SQL 的操作。

在 Java 中使用 Spark 2.x(及更高版本)

创建 SparkSession 对象又名spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

使用StructType 为行创建架构

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

从 CSV 文件创建数据框并将架构应用到它

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

more option on reading data from CSV file

现在我们可以通过两种方式聚合数据

1。 SQL方式

在spark sql metastore中注册一个表来执行SQL操作

df.createOrReplaceTempView("employee");

在注册的数据帧上运行 SQL 查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

我们甚至可以execute SQL directly on CSV file with out creating table with Spark SQL


2。对象链接或编程或类似Java的方式

为 sql 函数做必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

在数据帧/数据集上使用groupByagg 来执行countsum关于数据

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

依赖库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"

【讨论】:

使用 Spark 2.x 和 Scala 读取 CSV:***.com/a/39533431/1592191【参考方案3】:

以下内容可能不完全正确,但它应该让您了解如何处理数据。它不漂亮,应该用案例类等替换,但作为如何使用 spark api 的快速示例,我希望它就足够了:)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )


#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

或者你可以使用 SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""

【讨论】:

感谢您的快速响应,我想要按结果分组,例如 mysql 中的 ex 选择 Dept,designation,state,sum(costToCompany) from employeeTable group by Dept,Designation,state;不仅仅是像销售这样的部门 然后直接跳过过滤步骤。我已经相应地更新了代码。目标是将行转换为键值元素,其中键包含要分组的标识符,值包含要减少的值。在这种情况下,我们按部门、职务和州对事物进行分组,并且我们想将员工人数与成本相加,所以这些就是值。 非常感谢,我会试试的。你拯救了我的一天!【参考方案4】:

对于 JSON,如果您的文本文件每行包含一个 JSON 对象,您可以使用 sqlContext.jsonFile(path) 让 Spark SQL 将其加载为 SchemaRDD(将自动推断架构)。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为RDD[String],每条记录包含一个 JSON 对象,并使用sqlContext.jsonRDD(rdd) 将其转换为SchemaRDD。当您需要对数据进行预处理时,jsonRDD 很有用。

【讨论】:

以上是关于使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet的主要内容,如果未能解决你的问题,请参考以下文章

使用java将oracle数据库连接到apache spark时出错

使用 Java 将数据存储为 Apache Spark 中的配置单元表

将 CSV 数据加载到 Dataframe 并使用 Apache Spark (Java) 转换为 Array

在 Apache Spark 中,用 Java 将数据帧写入 Hive 表

如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?

使用 Apache Spark 和 Java 按列分组并将每组字符串写入文本文件