如何在 Spark Java 中的数据集上应用地图功能

Posted

技术标签:

【中文标题】如何在 Spark Java 中的数据集上应用地图功能【英文标题】:How to apply map function on dataset in spark java 【发布时间】:2016-12-13 08:35:57 【问题描述】:

我的 CSV 文件:

YEAR,UTILITY_ID,UTILITY_NAME,OWNERSHIP,STATE_CODE,AMR_METERING_RESIDENTIAL,AMR_METERING_COMMERCIAL,AMR_METERING_INDUSTRIAL,AMR_METERING_TRANS,AMR_METERING_TOTAL,AMI_METERING_RESIDENTIAL,AMI_METERING_COMMERCIAL,AMI_METERING_INDUSTRIAL,AMI_METERING_TRANS,AMI_METERING_TOTAL,ENERGY_SERVED_RESIDENTIAL,ENERGY_SERVED_COMMERCIAL,ENERGY_SERVED_INDUSTRIAL,ENERGY_SERVED_TRANS,ENERGY_SERVED_TOTAL
2011,34,City of Abbeville - (SC),M,SC,880,14,,,894,,,,,,,,,,
2011,84,A & N Electric Coop,C,MD,135,25,,,160,,,,,,,,,,
2011,84,A & N Electric Coop,C,VA,31893,2107,0,,34000,,,,,,,,,,
2011,97,Adams Electric Coop,C,IL,8334,190,,,8524,,,,,0,,,,,0
2011,108,Adams-Columbia Electric Coop,C,WI,33524,1788,709,,36021,,,,,,,,,,
2011,118,Adams Rural Electric Coop, Inc,C,OH,7457,20,,,7477,,,,,,,,,,
2011,122,Village of Arcade,M,NY,3560,498,100,,4158,,,,,,,,,,
2011,155,Agralite Electric Coop,C,MN,4383,227,315,,4925,,,,,,,,,,

这里是 Spark 代码以读取 CSV 文件:

public class ReadFile8 

    public static void main(String[] args) throws IOException 

        SparkSession session = new SparkSession.Builder().appName("CsvReader").master("local").getOrCreate();

        //Data taken by Local System
        Dataset<Row> file8Data = session.read().format("com.databricks.spark.csv").option("header", "true").load("file:///home/kumar/Desktop/Eletricaldata/file8_2011.csv");

        // Register the DataFrame as a SQL temporary view
        file8Data.createOrReplaceTempView("EletricalFile8Data");
        file8Data.show();
    


如何在 Spark 中使用 Java 应用地图功能和平面地图功能?

【问题讨论】:

你的预期输出是什么? 使用地图和平面地图功能以及任何类型的分析来分析数据。 【参考方案1】:

您可以使用以下代码作为示例:

Dataset<Integer> years = file8Data.map((MapFunction<Row, Integer>) row -> row.<Integer>getAs("YEAR"), Encoders.INT());
Dataset<Integer> newYears = years.flatMap((FlatMapFunction<Integer, Integer>) year -> 
  return Arrays.asList(year + 1, year + 2).iterator();
, Encoders.INT());

【讨论】:

您的任何材料 sparkjava 请发给我。我的电子邮件。我的电子邮件是:kumara1223@gmail.com 不工作此代码。错误正在出现:java.lang.ClassCastException: java.lang.String 无法转换为 java.lang.Integer 这意味着 Spark 将“YEAR”列视为字符串。将inferSchema 设置为true,Spark 将尝试推断列类型或将上面代码中的Integer 替换为String 我没有任何具体的材料。我会推荐docs,您可以在其中选择示例语言。此外,this 应该会有所帮助。 在文档材料中,没有提供所有功能,仅提供了几个示例,如何使用所有功能进行代码。【参考方案2】:

如果 Encoders.INT() 不起作用,请改用此方法

编码器$.MODULE$.INT()

【讨论】:

以上是关于如何在 Spark Java 中的数据集上应用地图功能的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark 数据集上使用 group by

如何在 Python 中的 Spark Dataframe 上应用任何类型的地图转换

如何将地图 List<Map<String, String>> myList 列表转换为 Java 中的 Spark Dataframe?

如何将集合分组为数据集上的运算符/方法?

有没有办法在分区的 spark 数据集上并行运行操作?

如何解决嵌套地图函数中的 SPARK-5063