spark-redshift - 使用 Spark 2.1.0 保存时出错
Posted
技术标签:
【中文标题】spark-redshift - 使用 Spark 2.1.0 保存时出错【英文标题】:spark-redshift - Error save using Spark 2.1.0 【发布时间】:2017-01-20 13:31:04 【问题描述】:我正在使用 spark-redshift 加载从 mysql binlog 获取数据事件的 Kafka 流。
当我尝试将 RDD 保存到 Redshift 时,会引发异常:
command> ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,databricks:spark-redshift:3.0.0-preview1,com.amazonaws:aws-java-sdk:1.11.80,org.apache.hadoop:hadoop-aws:2.7.2 processor.py
处理器代码是:
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
import json
def process(rdd,sc):
try:
dataset = rdd.map(lambda (key, value): json.loads(value)['data']).collect()
spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate();
df = spark.createDataFrame(dataset)
df.write.format("com.databricks.spark.redshift") \
.option("url","jdbc:redshift://XXXXXXX.us-east-1.redshift.amazonaws.com:5439/cerebro?user=XXXXXd&password=XXXXXX-") \
.option("dbtable", "cur_date") \
.option("tempdir", "s3n://BUCKET/stg/avro/cur_date/data") \
.option("aws_iam_role","arn:aws:iam::XXXXXXXX:role/XXXXXXXXXXX") \
.option("extracopyoptions", "TIMEFORMAT AS 'MM.DD.YYYY HH:MI:SS'") \
.mode("error") \
.save()
#df.write.format("com.databricks.spark.avro").save("/tmp/output")
except Exception,e:
print(e)
pass
conf = SparkConf().setMaster("local[*]").setAppName("BinlogStreamProcessor")
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
ssc = StreamingContext(sc, 10)
zkQuorum = "localhost:32774,localhost:32775,localhost:32776"
topic = "maxwell"
stream = KafkaUtils.createStream(ssc, zkQuorum, "binlog-consumer", topic: 1)
df = stream.foreachRDD(lambda k: process(k,sc))
ssc.start()
ssc.awaitTermination()
错误信息
17/01/20 13:17:34 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 3)
java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/20 13:17:34 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
17/01/20 13:17:34 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:295)
at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:392)
at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String;
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
我也尝试使用其他版本,例如 2.0.2 和 2.0.1,但没有成功。
有没有办法在这个版本中解决这个问题,或者让这个代码在其他版本的 Spark 或 spark-redshift 中工作?我开始在 spark 中编写代码,任何提示都会有所帮助。
【问题讨论】:
【参考方案1】:我在使用 Spark 2.1.0 时遇到了同样的问题,但使用 Spark 2.0.0 能够解决。 为 2.1.0 添加了 OutputWriterFactory.getFileExtension()。 (请参阅SPARK-18024)所以我想这部分应该适用于 2.0.1 和 2.0.2。 (但我没有尝试过,可能是错的)。 另一个注意事项是 spark-redshift 似乎仅针对 2.0.0 进行了认证。这是build script
sparkVersion := "2.0.0",
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
...
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(),
"com.databricks" %% "spark-avro" % testSparkAvroVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force()
),
【讨论】:
以上是关于spark-redshift - 使用 Spark 2.1.0 保存时出错的主要内容,如果未能解决你的问题,请参考以下文章
在 V4/孟买/法兰克福地区使用 Spark-Redshift 连接器时出错
从 Redshift 读取到 Spark Dataframe(Spark-Redshift 模块)
由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥
Spark 2.0.0 使用 jdbc 从 Redshift 表中截断