Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流

Posted

技术标签:

【中文标题】Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流【英文标题】:Cloudera Hadoop 2.6.0-cdh5.14.2 and Structured Streaming 【发布时间】:2018-11-14 15:34:32 【问题描述】:

是否有人能够使用外部库(主要是 spark-sql-*)在 Hadoop 2.6.0-cdh5.14.2 上启动结构化流式传输。

更新

在此之前:缺少我之前帖子中的信息:Spark 的版本为 2.3.0

根据我的远程朋友的建议,我这样做了:

    我从 python 迁移到 Scala(得到更好的支持,它是原生 Spark 语言) 我使用 Kafka 以外的其他来源运行结构化流。

我使用了一个简单的 csv 作为源:

$ export SPARK_KAFKA_VERSION=0.10
$ spark2-shell 

scala> import org.apache.spark.sql.Encoders
scala> case class Amazon(EventId:String, DOCOMOEntitlementId:String, AmazonSubscriptionId:String, AmazonPlanId:String, DOCOMOUserId:String, MerchantAccountKey:String, ResellerKey:String, Status:String, CreatedDate:String, EndDate:String, ActivatedDate:String, FailedDate:String, ExpiryDate:String, LastUpdated:String, dateTimeStart:String, dateTimeEnd:String, referrerSource:String, reasonCode:String)
scala> val schema = Encoders.product[Amazon].schema
scala> val data = spark.readStream.schema(schema).csv("/user/ale/csv.csv").as[Amazon]

scala> data.isStreaming 
res0: Boolean = true

scala> val ss = data.writeStream.outputMode("append").format("console")
scala> ss.start()

这段代码神奇地起作用了。

Cloudera 声称他们不支持结构化流,据此,我刚刚更改源的以下代码失败:

val data =spark.readStream.format("kafka")... 

引发此异常:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 50 more

而且我只使用提供的 Cloudera 库(没有外部 jars)。请注意

     $ export SPARK_KAFKA_VERSION=0.10

用于强制使用 0.10 版本( spark-streaming-kafka-.. ),因为在集群上也存在 0.8 版本。但是没有 spark-sql-kafka jar。

此时,AFAIK,问题是我缺少正确的库(jar)。尽管 Cloudera 网站上有所有警告,但 Spark 2.3.0 看起来很健康。

那么...是否可以选择使用“非官方官方 Cloudera Jar”来解决此问题?有人找到了一个很好的 Jar 来部署解决这个问题的代码吗? cloudera 的 Jar 选项更好:内部政策拒绝将第三方 jar 与代码捆绑在一起。

另一个选项是使用 directStreaming 再次重新实现所有结构化流的东西。这是我喜欢避免的工作。

【问题讨论】:

如果您拥有集群的管理员权限,那么没有什么可以阻止您安装最新版本的 Spark 来运行结构化流式处理(事实上,您甚至不需要“安装”Spark,只需设置提交 YARN 作业) 我用更多有用的信息更新了问题,spark 已经是正确的版本,最后问题是关于 jars/cloudera 政策。 【参考方案1】:

我认为这就是我的问题的答案:

    来自 Cloudera 的库确实存在,它是 spark-sql-kafka-0-10_2.11-2.3.0.cloudera2.jar 如果 Kafka 在 Sentry 下,它将无法工作。禁用它。

遗憾的是,代码需要为每个查询创建新的 group.id

 18/11/15 10:51:25 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-0
 18/11/15 10:51:27 WARN kafka010.KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-1
 18/11/15 10:51:28 WARN kafka010.KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-707ab780-c71c-408b-80dd-be1960a03dd6-360506181-driver-2
 18/11/15 10:51:29 ERROR streaming.MicroBatchExecution: Query [id = 099e897f-2a44-4a50-bc57-46f898e05174, runId = b010d8d8-7b73-4f71-8ca5-f3eda47149c6] terminated

Sentry 不会允许这些组访问数据。由于它是在 KafkaSourceProvider.scala 代码中编码的,因此无法避免它:

希望这可以节省其他人的时间。

【讨论】:

以上是关于Cloudera Hadoop 2.6.0-cdh5.14.2 和结构化流的主要内容,如果未能解决你的问题,请参考以下文章

HADOOP环境搭建

Hadoop的安装与环境配置

hadoop

hadoop(cdh版)下载安装即配置

hadoop安装

Spark环境搭建-----------HDFS分布式文件系统搭建