Apache Spark 聚合函数在运行时遇到 ArrayIndexOutOfBoundsException

Posted

技术标签:

【中文标题】Apache Spark 聚合函数在运行时遇到 ArrayIndexOutOfBoundsException【英文标题】:Apache Spark Aggregate functions running into ArrayIndexOutOfBoundsException during runtime 【发布时间】:2017-08-28 19:19:41 【问题描述】:

在将我的 java spark 程序部署到集群期间,它遇到了 ArrayIndexOutOfBoundsException: 11 异常

据我了解,我的代码编写方式在语法上没有任何问题,而且索引错误并不能说明问题所在。我的程序只是应该能够采用 12 列,用空格分隔,从那时起它需要采用 ONE 列(命令列)并进行聚合以查看每个命令存在多少次,即

column1 column2 command column3 ect
dggd     gdegdg  cmd#1   533    ect
dggd     gdegdg  cmd#1   533    ect
dggd     gdegdg  cmd#2   534    ect
dggd     gdegdg  cmd#5   5353   ect
dggd     gdegdg  cmd#2   533    ect

看起来像

commmand    count
command#1    5
command#2    15
command#5    514

我在跑步 火花2.1 高密度板 2.6 这是我到目前为止的代码

public class Main 
public static void main(String[] args) 
    //functions fu = new functions();

 JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("appName").setMaster("local[*]"));
 SparkSession spark = SparkSession
            .builder()
            .appName("Log File Reader")
            .getOrCreate();

    JavaRDD<String> logsRDD = spark.sparkContext()
            .textFile(args[0],1)
            .toJavaRDD();

    String schemaString = "date time command service responseCode bytes ip dash1 dash2 dash3 num dash4";

    List<StructField> fields = new ArrayList<>();
    String[] fieldName = schemaString.split(" ");

    for (String field : fieldName)
        fields.add(DataTypes.createStructField(field, DataTypes.StringType, true));
    

    StructType schema = DataTypes.createStructType(fields);

    JavaRDD<Row> rowRDD = logsRDD.map((Function<String, Row>) record -> 
       String[] attributes = record.split(" ");
       return RowFactory.create(attributes[0],attributes[1],attributes[2],attributes[3],attributes[4],attributes[5],
               attributes[6],attributes[7],attributes[8],attributes[9],
               attributes[10],attributes[11]);
    );

    Dataset<Row> dataDF = spark.createDataFrame(rowRDD, schema);

    dataDF.createOrReplaceTempView("data");

    //shows the top 20 rows from the dataframe including all columns
    Dataset<Row> showDF = spark.sql("select * from data");

    //shows the top 20 columns from the same dataframe, but only displays 
    //the command column
    Dataset<Row> commandDF = spark.sql("select command from data");
    showDF.show();
    commandDF.show();

此代码运行良好,但是当我尝试使用如下代码查找最终结果时,它会遇到索引错误。

logsDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from logs group by command").show();

最后是 spark-submit 代码

spark-submit --class com.ect.java.Main /path/application.jar hdfs:///path/textfile.txt

在我看来,这是一个环境问题,但找不到任何与此相关问题的文档

【问题讨论】:

【参考方案1】:

问题不在于聚合函数。问题在于您的日志文件。你得到错误

Caused by: java.lang.ArrayIndexOutOfBoundsException: 11 

这意味着您的日志文件中的一行有 11 个条目,而不是 12 个条目,这是您的程序所要求的。您可以通过创建示例 log.txt 文件并在该文件中保留两行来验证这一点。

您的分组代码应如下所示(看起来像是错字)。在您的示例应用程序中,您有dataDF 而不是logsDF。临时表名称是 data 而不是 logs

dataDF.groupBy(col("command")).count().show();
Dataset<Row> ans = spark.sql("select command, count(*) from data group by command");
ans.show();

【讨论】:

谢谢你这是正确的,能够错误检查传入的数据以解决这个问题

以上是关于Apache Spark 聚合函数在运行时遇到 ArrayIndexOutOfBoundsException的主要内容,如果未能解决你的问题,请参考以下文章

org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数

极简spark教程spark聚合函数

Spark 系列—— Spark SQL 聚合函数 Aggregations

Spark-->combineByKey请阅读Apache spark官网文档

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数

Spark SQL内置函数