编译支持 spark 读写 oss(cdh 5.x)

Posted 来世愿做友人_A

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编译支持 spark 读写 oss(cdh 5.x)相关的知识,希望对你有一定的参考价值。

前言

背景:使用 spark 读取 hdfs 文件写入到 oss
hadoop : 2.6.0-cdh5.15.1
spark : 2.4.1
主要参考链接:https://blog.csdn.net/wankund...
增加了注意点和坑点
编译 hadoop-aliyun
hadoop 高版本已经默认支持 aliyun-oss 的访问,而本版本不支持,需要编译支持下
  • 拉取 hadoop trunk 分支代码,copy hadoop-tools/hadoop-aliyun 模块代码到 cdh 对应的项目模块中
  • 修改 hadoop-tools pom.xml

    • <module>hadoop-aliyun</module> 添加 hadoop-aliyun 子 module
  • 修改根 pom.xml 中的 java 版本为 1.8,hadoop-aliyun 使用了 1.8 的 lambda 语法,也可以直接修改代码支持
  • 修改 hadoop-aliyun pom.xml,修改 version,以及相关的 oss,http 依赖包,使用 shade 插件将相关依赖打进去
  • 代码修改

    • import org.apache.commons.lang3 改为 import org.apache.commons.lang
    • 复制(cdh版本) hadoop-aws 模块下的 BlockingThreadPoolExecutorService 和 SemaphoredDelegatingExecutor 两个类 到 org.apache.hadoop.util 目录下
  • 编译模块 hadoop-aliyun

    • mvn clean package -pl hadoop-tools/hadoop-aliyun

最终的配置文件如下

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-project</artifactId>
    <version>2.6.0-cdh5.15.1</version>
    <relativePath>../../hadoop-project</relativePath>
  </parent>
  <artifactId>hadoop-aliyun</artifactId>
  <name>Apache Hadoop Aliyun OSS support</name>
  <packaging>jar</packaging>

  <properties>
    <file.encoding>UTF-8</file.encoding>
    <downloadSources>true</downloadSources>
  </properties>

  <profiles>
    <profile>
      <id>tests-off</id>
      <activation>
        <file>
          <missing>src/test/resources/auth-keys.xml</missing>
        </file>
      </activation>
      <properties>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </profile>
    <profile>
      <id>tests-on</id>
      <activation>
        <file>
          <exists>src/test/resources/auth-keys.xml</exists>
        </file>
      </activation>
      <properties>
        <maven.test.skip>false</maven.test.skip>
      </properties>
    </profile>
  </profiles>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>findbugs-maven-plugin</artifactId>
        <configuration>
          <findbugsXmlOutput>true</findbugsXmlOutput>
          <xmlOutput>true</xmlOutput>
          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
          </excludeFilterFile>
          <effort>Max</effort>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
          <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
          <execution>
            <id>deplist</id>
            <phase>compile</phase>
            <goals>
              <goal>list</goal>
            </goals>
            <configuration>
              <!-- build a shellprofile -->
              <outputFile>
                ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
              </outputFile>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <id>shade-aliyun-sdk-oss</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <shadedArtifactAttached>false</shadedArtifactAttached>
              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
              <createDependencyReducedPom>true</createDependencyReducedPom>
              <createSourcesJar>true</createSourcesJar>
              <relocations>
                <relocation>
                  <pattern>org.apache.http</pattern>
                  <shadedPattern>com.xxx.thirdparty.org.apache.http</shadedPattern>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.aliyun.oss</groupId>
      <artifactId>aliyun-sdk-oss</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpcore</artifactId>
      <version>4.4.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpcore</artifactId>
        </exclusion>
      </exclusions>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-tests</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-examples</artifactId>
      <scope>test</scope>
      <type>jar</type>
    </dependency>
  </dependencies>

</project>

spark 读写取 oss 文件

val inputPath = "hdfs:///xxx"
val outputPath = "oss://bucket/OSS_FILES"

val conf = new SparkConf()
    conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-xxx")
    conf.set("spark.hadoop.fs.oss.accessKeyId", "xxx")
    conf.set("spark.hadoop.fs.oss.accessKeySecret", "xxx")
    conf.set("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
    conf.set("spark.hadoop.fs.oss.buffer.dir", "/tmp/oss")
    conf.set("spark.hadoop.fs.oss.connection.secure.enabled", "false")
    conf.set("spark.hadoop.fs.oss.connection.maximum", "2048")
    
spark.write.format("orc").mode("overwrite").save(outputPath)
其它以 spark sql 以及 hdfs 读取 oss 的方式可以参考后面的第三个链接

spark submit

spark-submit \\
--class org.example.HdfsToOSS \\
--master yarn \\
--deploy-mode cluster \\
--num-executors 2 \\
--executor-cores 2 \\
--executor-memory 3G \\
--driver-cores 1  \\
--driver-memory 3G \\
--conf "spark.driver.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \\
--conf "spark.executor.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \\
--jars ./hadoop-aliyun-2.6.0-cdh5.15.1.jar,./hadoop-common-2.6.0-cdh5.15.1.jar \\
./spark-2.4-worker-1.0-SNAPSHOT.jar
注意下 extraClassPath 这个参数,如果没有特殊的配置,spark 会默认加载自身的 hadoop-common 包,如果版本不对,可能会导致 ClassNotFound,需要 extraClassPath 指定,会优先加载

才疏学浅,如果有错误的地方,欢迎指正

参考链接

  • Spark依赖包加载顺序和冲突解决方案
  • Spark Configuration
  • 修改 Spark 支持远程访问OSS文件

以上是关于编译支持 spark 读写 oss(cdh 5.x)的主要内容,如果未能解决你的问题,请参考以下文章

1Spark 2.1 源码编译支持CDH

Spark读写Hive添加PMML支持

Spark on K8S环境部署细节

CDH 集群 使用 JindoFS SDK 访问 OSS

Apache spark2.1.0编译hadoop-2.6-cdh5.11.2的对应版本

spark编译安装 spark 2.1.0 hadoop2.6.0-cdh5.7.0