spark-redshift - 使用 Spark 2.1.0 保存时出错

Posted

技术标签:

【中文标题】spark-redshift - 使用 Spark 2.1.0 保存时出错【英文标题】:spark-redshift - Error save using Spark 2.1.0 【发布时间】:2017-01-20 13:31:04 【问题描述】:

我正在使用 spark-redshift 加载从 mysql binlog 获取数据事件的 Kafka 流。

当我尝试将 RDD 保存到 Redshift 时,会引发异常:

command> ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,databricks:spark-redshift:3.0.0-preview1,com.amazonaws:aws-java-sdk:1.11.80,org.apache.hadoop:hadoop-aws:2.7.2 processor.py

处理器代码是:

from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

import json

def process(rdd,sc):
  try:
      dataset = rdd.map(lambda (key, value): json.loads(value)['data']).collect()
      spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate();

      df = spark.createDataFrame(dataset)
      df.write.format("com.databricks.spark.redshift") \
          .option("url","jdbc:redshift://XXXXXXX.us-east-1.redshift.amazonaws.com:5439/cerebro?user=XXXXXd&password=XXXXXX-") \
          .option("dbtable", "cur_date") \
          .option("tempdir", "s3n://BUCKET/stg/avro/cur_date/data") \
          .option("aws_iam_role","arn:aws:iam::XXXXXXXX:role/XXXXXXXXXXX") \
          .option("extracopyoptions", "TIMEFORMAT AS 'MM.DD.YYYY HH:MI:SS'") \
          .mode("error") \
          .save()
      #df.write.format("com.databricks.spark.avro").save("/tmp/output")
  except Exception,e:
      print(e)
      pass

conf = SparkConf().setMaster("local[*]").setAppName("BinlogStreamProcessor")

sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")

ssc = StreamingContext(sc, 10)

zkQuorum = "localhost:32774,localhost:32775,localhost:32776"
topic = "maxwell"

stream = KafkaUtils.createStream(ssc, zkQuorum, "binlog-consumer", topic: 1)

df = stream.foreachRDD(lambda k: process(k,sc))

ssc.start()
ssc.awaitTermination()

错误信息

    17/01/20 13:17:34 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 3)
java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/01/20 13:17:34 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
17/01/20 13:17:34 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
    at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:295)
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:392)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:108)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

我也尝试使用其他版本,例如 2.0.2 和 2.0.1,但没有成功。

有没有办法在这个版本中解决这个问题,或者让这个代码在其他版本的 Spark 或 spark-redshift 中工作?我开始在 spark 中编写代码,任何提示都会有所帮助。

【问题讨论】:

【参考方案1】:

我在使用 Spark 2.1.0 时遇到了同样的问题,但使用 Spark 2.0.0 能够解决。 为 2.1.0 添加了 OutputWriterFactory.getFileExtension()。 (请参阅SPARK-18024)所以我想这部分应该适用于 2.0.1 和 2.0.2。 (但我没有尝试过,可能是错的)。 另一个注意事项是 spark-redshift 似乎仅针对 2.0.0 进行了认证。这是build script

sparkVersion := "2.0.0",
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),

...

  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
    "org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
    "org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
    "com.databricks" %% "spark-avro" % testSparkAvroVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
  ),

【讨论】:

以上是关于spark-redshift - 使用 Spark 2.1.0 保存时出错的主要内容,如果未能解决你的问题,请参考以下文章

在 V4/孟买/法兰克福地区使用 Spark-Redshift 连接器时出错

从 Redshift 读取到 Spark Dataframe(Spark-Redshift 模块)

由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥

Spark 2.0.0 使用 jdbc 从 Redshift 表中截断

在 EMR 中使用 spark ad scala 从 redshift 加载数据

无法从火花连接到红移