Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流
Posted
技术标签:
【中文标题】Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流【英文标题】:Cloudera Hadoop 2.6.0-cdh5.14.2 and Structured Streaming 【发布时间】:2018-11-14 15:34:32 【问题描述】:是否有人能够使用外部库(主要是 spark-sql-*)在 Hadoop 2.6.0-cdh5.14.2 上启动结构化流式传输。
更新
在此之前:缺少我之前帖子中的信息:Spark 的版本为 2.3.0
根据我的远程朋友的建议,我这样做了:
-
我从 python 迁移到 Scala(得到更好的支持,它是原生 Spark 语言)
我使用 Kafka 以外的其他来源运行结构化流。
我使用了一个简单的 csv 作为源:
$ export SPARK_KAFKA_VERSION=0.10
$ spark2-shell
scala> import org.apache.spark.sql.Encoders
scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)
scala> val schema = Encoders.product[Amazon].schema
scala> val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]
scala> data.isStreaming
res0: Boolean = true
scala> val ss = data.writeStream.outputMode("append").format("console")
scala> ss.start()
这段代码神奇地起作用了。
Cloudera 声称他们不支持结构化流,据此,我刚刚更改源的以下代码失败:
val data =spark.readStream.format("kafka")...
引发此异常:
java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 50 more
而且我只使用提供的 Cloudera 库(没有外部 jars)。请注意
$ export SPARK_KAFKA_VERSION=0.10
用于强制使用 0.10 版本( spark-streaming-kafka-.. ),因为在集群上也存在 0.8 版本。但是没有 spark-sql-kafka jar。
此时,AFAIK,问题是我缺少正确的库(jar)。尽管 Cloudera 网站上有所有警告,但 Spark 2.3.0 看起来很健康。
那么...是否可以选择使用“非官方官方 Cloudera Jar”来解决此问题?有人找到了一个很好的 Jar 来部署解决这个问题的代码吗? cloudera 的 Jar 选项更好:内部政策拒绝将第三方 jar 与代码捆绑在一起。
另一个选项是使用 directStreaming 再次重新实现所有结构化流的东西。这是我喜欢避免的工作。
【问题讨论】:
如果您拥有集群的管理员权限,那么没有什么可以阻止您安装最新版本的 Spark 来运行结构化流式处理(事实上,您甚至不需要“安装”Spark,只需设置提交 YARN 作业) 我用更多有用的信息更新了问题,spark 已经是正确的版本,最后问题是关于 jars/cloudera 政策。 【参考方案1】:我认为这就是我的问题的答案:
-
来自 Cloudera 的库确实存在,它是 spark-sql-kafka-0-10_2.11-2.3.0.cloudera2.jar
如果 Kafka 在 Sentry 下,它将无法工作。禁用它。
遗憾的是,代码需要为每个查询创建新的 group.id
18/11/15 10:51:25 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2
18/11/15 10:51:29 ERROR streaming.MicroBatchExecution: Query [id = 099e897f-2a44-4a50-bc57-46f898e05174, runId = b010d8d8-7b73-4f71-8ca5-f3eda47149c6] terminated
Sentry 不会允许这些组访问数据。由于它是在 KafkaSourceProvider.scala 代码中编码的,因此无法避免它:
希望这可以节省其他人的时间。
【讨论】:
以上是关于Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流的主要内容,如果未能解决你的问题,请参考以下文章