怎样获得在yarn框架上运行jar包的执行结果

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了怎样获得在yarn框架上运行jar包的执行结果相关的知识,希望对你有一定的参考价值。

配置方法
(1) 首先需要确保spark在1.1.0以上的版本。
(2) 在HDFS上建立一个公共lib库,比如/system/spark-lib/,设置权限为755。把spark-assembly-*.jar上传到公共lib库中。
(3) 在spark-env.sh中配置:

view plaincopy to clipboardprint?
<span style="font-size:14px;">spark.yarn.jar hdfs://yarncluster/system/spark_lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.0.jarspark.yarn.preserve.staging.files false</span>

**spark.yarn.jar配置成HDFS上的公共lib库中的jar包。这个配置项会使提交job时,不是从本地上传spark-assembly*.jar包,而是从HDFS的一个目录复制到另一个目录(不确定HDFS上的复制是怎么操作的),总的来说节省了一点时间。(网上有的文章里说,这里的配置,会节省掉上传jar包的步骤,其实是不对的,只是把从本地上传的步骤改成了在HDFS上的复制操作。)
**spark.yarn.preserve.staging.files: 这个配置项配置成false,表示在执行结束后,不保留staging files,也就是两个jar包。然后HDFS上的.sparkStaging下的两个jar包在作业执行完成后就会被删除。如果配置成true,执行完后HDFS上的.sparkStaging下两个jar包都会保存下来。
然后再运行,发现HDFS上.sparkStaging目录下不会再保留jar包。
问题定位
按道理来说,因为spark.yarn.preserve.staging.files默认是false,所以HDFS上的jar包是不会被保留的。但是在spark1.0.2中,却没有删除。我看了下1.0.2的代码,删除的机制是存在的:
//yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

view plaincopy to clipboardprint?
<span style="font-size:14px;"><span style="font-family:Microsoft YaHei;font-size:12px;"> /** * Clean up the staging directory. */ private def cleanupStagingDir() var stagingDirPath: Path = null try val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) logError("Staging directory is null") return logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) catch case ioe: IOException => logError("Failed to cleanup staging dir " + stagingDirPath, ioe) </span></span>
按照这个逻辑,默认在AM关闭的时候,是会删除HDFS上的jar包的。不过没有正常删除。推测这应该是一个1.0.2里面的bug,而在1.1.0里面已经修复。

nodemanager节点上的jar包缓存
升级到1.1.0版本后,HDFS上的jar包问题就解决了。但是nodemanager节点上的jar包还是会保留。这个问题的定位很纠结,不过结果却出乎意料的简单。不说了,上结果吧。
配置方法
(1) 配置yarn-site.xml:
<span style="font-family:Microsoft YaHei;font-size:12px;"> <property>
<name>yarn.nodemanager.local-dirs</name>
<value>local-dir1, local-dir2,local-dir3</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.target-size-mb</name>
<value>1024</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
<value>1800000</value>
</property></span>
**yarn.nodemanager.local-dirs: 这个目录是nodemanager上的作业中间数据存放路径。推荐配置多个盘上的多个路径,从而分散作业执行中的磁盘IO压力。
**yarn.nodemanager.localizer.cache.target-size-mb:配置nodemanager上的缓存目录的最大限度。nodemanager上有一个deletion server服务,会定期检测,如果yarn.nodemanager.local-dirs中配置的目录大小(如果配置了多个,则计算多个目录的总大小)是否超过了这里设置的最大限度值。如果超过了,就删除一些已经执行完的container的缓存数据。
因为spark提交作业后遗留在nodemanager上的jar包就在yarn.nodemanager.local-dirs下面,所以只要这里配置合适的大小值。那么nodemanager上的deletion server是会自动检测并保证目录总大小的。所以只要配置了这个量,我们就不需要再担心nodemanager上的jar包缓存问题了,交给yarn就好了!很简单啊有木有,可就这么个问题,居然花了我一个星期的时间去定位。
**yarn.nodemanager.localizer.cache.cleanup.interval-ms: deletion server多长时间做一次检测,并且清除缓存目录直到目录大小低于target-size-mb的配置。
通过上面这三个量的配置,nodemanager会确保本地的缓存数据总量在target-size-mb之下,也就是超过了的话,之前的spark的jar包就会被删除。所以我们就不需要再担心nodemanager节点上的spark jar包缓存问题了。不过target-size-mb的默认值是10G,这个值当然可以根据你的实际情况进行调整。
参考技术A 配置方法
(1) 首先需要确保spark在1.1.0以上的版本。
(2) 在HDFS上建立一个公共lib库,比如/system/spark-lib/,设置权限为755。把spark-assembly-*.jar上传到公共lib库中。
(3) 在spark-env.sh中配置:

view plaincopy to clipboardprint?
<span style="font-size:14px;">spark.yarn.jar hdfs://yarncluster/system/spark_lib/spark-assembly-1.1.0-hadoop2.3.0-cdh5.1.0.jarspark.yarn.preserve.staging.files false</span>

**spark.yarn.jar配置成HDFS上的公共lib库中的jar包。这个配置项会使提交job时,不是从本地上传spark-assembly*.jar包,而是从HDFS的一个目录复制到另一个目录(不确定HDFS上的复制是怎么操作的),总的来说节省了一点时间。(网上有的文章里说,这里的配置,会节省掉上传jar包的步骤,其实是不对的,只是把从本地上传的步骤改成了在HDFS上的复制操作。)
**spark.yarn.preserve.staging.files: 这个配置项配置成false,表示在执行结束后,不保留staging files,也就是两个jar包。然后HDFS上的.sparkStaging下的两个jar包在作业执行完成后就会被删除。如果配置成true,执行完后HDFS上的.sparkStaging下两个jar包都会保存下来。
然后再运行,发现HDFS上.sparkStaging目录下不会再保留jar包。
问题定位
按道理来说,因为spark.yarn.preserve.staging.files默认是false,所以HDFS上的jar包是不会被保留的。但是在spark1.0.2中,却没有删除。我看了下1.0.2的代码,删除的机制是存在的:
//yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

view plaincopy to clipboardprint?
<span style="font-size:14px;"><span style="font-family:Microsoft YaHei;font-size:12px;"> /** * Clean up the staging directory. */ private def cleanupStagingDir() var stagingDirPath: Path = null try val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean if (!preserveFiles) stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) if (stagingDirPath == null) logError("Staging directory is null") return logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) catch case ioe: IOException => logError("Failed to cleanup staging dir " + stagingDirPath, ioe) </span></span>
按照这个逻辑,默认在AM关闭的时候,是会删除HDFS上的jar包的。不过没有正常删除。推测这应该是一个1.0.2里面的bug,而在1.1.0里面已经修复。

nodemanager节点上的jar包缓存
升级到1.1.0版本后,HDFS上的jar包问题就解决了。但是nodemanager节点上的jar包还是会保留。这个问题的定位很纠结,不过结果却出乎意料的简单。不说了,上结果吧。
配置方法
(1) 配置yarn-site.xml:
<span style="font-family:Microsoft YaHei;font-size:12px;"> <property>
<name>yarn.nodemanager.local-dirs</name>
<value>local-dir1, local-dir2,local-dir3</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.target-size-mb</name>
<value>1024</value>
</property>
<property>
<name>yarn.nodemanager.localizer.cache.cleanup.interval-ms</name>
<value>1800000</value>
</property></span>
**yarn.nodemanager.local-dirs: 这个目录是nodemanager上的作业中间数据存放路径。推荐配置多个盘上的多个路径,从而分散作业执行中的磁盘IO压力。
**yarn.nodemanager.localizer.cache.target-size-mb:配置nodemanager上的缓存目录的最大限度。nodemanager上有一个deletion server服务,会定期检测,如果yarn.nodemanager.local-dirs中配置的目录大小(如果配置了多个,则计算多个目录的总大小)是否超过了这里设置的最大限度值。如果超过了,就删除一些已经执行完的container的缓存数据。
因为spark提交作业后遗留在nodemanager上的jar包就在yarn.nodemanager.local-dirs下面,所以只要这里配置合适的大小值。那么nodemanager上的deletion server是会自动检测并保证目录总大小的。所以只要配置了这个量,我们就不需要再担心nodemanager上的jar包缓存问题了,交给yarn就好了!很简单啊有木有,可就这么个问题,居然花了我一个星期的时间去定位。
**yarn.nodemanager.localizer.cache.cleanup.interval-ms: deletion server多长时间做一次检测,并且清除缓存目录直到目录大小低于target-size-mb的配置。
通过上面这三个量的配置,nodemanager会确保本地的缓存数据总量在target-size-mb之下,也就是超过了的话,之前的spark的jar包就会被删除。所以我们就不需要再担心nodemanager节点上的spark jar包缓存问题了。不过target-size-mb的默认值是10G,这个值当然可以根据你的实际情况进行调整。
参考技术B 配置方法 (1) 首先需要确保spark在110以上的版本。 (2) 在HDFS上建立一个公共lib库,比如/system/spark-lib/,设置权限为755。把spark-assembly-*jar上传到公共lib库中。 (3) 在spark-envsh中配置: view plaincopy to clipboardprint?怎样获得在yarn框架上运行jar包的执行结果

Quartz定时调度jar包的执行Demo分享

1.Quartz简介

? Quartz框架的核心是调度器。调度器负责管理Quartz应用运行时环境。调度器不是靠自己做所有的工作,而是依赖框架内一些非常重要的部件。Quartz不仅仅是线程和线程管理。为确保可伸缩性,Quartz采用了基于多线程的架构。启动时,框架初始化一套worker线程,这套线程被调度器用来执行预定的作业。这就是Quartz怎样能并发运行多个作业的原理。Quartz依赖一套松耦合的线程池管理部件来管理线程环境。

2.项目相关

? 该定时器Demo用于定时执行制定路径下的jar包的编译,也可以用于普通的任务调度.通过对任务的查询修改删除来管理整个列表文件.可以通关开启和关闭来更改jar的开始执行状态(开启后如果需要停止只能关闭或重启服务器才能生效,具体的解决办法还在改进中).相关的一整套的UI界面比较简略,可以进行二次开发,主要整合了JPA的分页查询,可以查看相关代码部分.

技术图片

3.测试路径

//主页路径:
http://localhost:9090/
//启动jar的路径(这里的我只是建了一个简单的springboot的helloword的Jar包,相关jar包在项目中可以找到)
http://localhost:8088/sayHello

4.代码相关

QuartzServiceImpl:
@Service
public class QuartzServiceImpl implements QuartzService {

    @Autowired
    private JobEntityRepository repository;

    @Override
    public JobEntity getById(int id) {
        return repository.getById(id);
    }

    //通过Id获取Job
    public JobEntity getJobEntityById(Integer id) {
        return repository.getById(id);
    }
    //从数据库中加载获取到所有Job
    public List<JobEntity> loadJobs() {
        List<JobEntity> list = new ArrayList<>();
        repository.findAll().forEach(list::add);
        return list;
    }
    //获取JobDataMap.(Job参数对象)
    public JobDataMap getJobDataMap(JobEntity job) {
        JobDataMap map = new JobDataMap();
        map.put("name", job.getName());
        map.put("group", job.getGroup());
        map.put("cronExpression", job.getCron());
        map.put("parameter", job.getParameter());
        map.put("JobDescription", job.getDescription());
        map.put("vmParam", job.getVmParam());
        map.put("jarPath", job.getJarPath());
        map.put("status", job.getStatus());
        return map;
    }
    //获取JobDetail,JobDetail是任务的定义,而Job是任务的执行逻辑,JobDetail里会引用一个Job Class来定义
    public JobDetail geJobDetail(JobKey jobKey, String description, JobDataMap map) {
        return JobBuilder.newJob(DynamicJob.class)
                .withIdentity(jobKey)
                .withDescription(description)
                .setJobData(map)
                .storeDurably()
                .build();
    }
    //获取Trigger (Job的触发器,执行规则)
    public Trigger getTrigger(JobEntity job) {
        return TriggerBuilder.newTrigger()
                .withIdentity(job.getName(), job.getGroup())
                .withSchedule(CronScheduleBuilder.cronSchedule(job.getCron()))
                .build();
    }
    //获取JobKey,包含Name和Group
    public JobKey getJobKey(JobEntity job) {
        return JobKey.jobKey(job.getName(), job.getGroup());
    }
}
DynamicJob:
/**
 * :@DisallowConcurrentExecution : 此标记用在实现Job的类上面,意思是不允许并发执行.
 * :注意org.quartz.threadPool.threadCount线程池中线程的数量至少要多个,否则@DisallowConcurrentExecution不生效
 * :假如Job的设置时间间隔为3秒,但Job执行时间是5秒,设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行,否则会在3秒时再启用新的线程执行
 */
@DisallowConcurrentExecution
@PersistJobDataAfterExecution //没有异常就更新数据 可用?
@Component
@Slf4j
public class DynamicJob implements Job {
    private Logger logger = LoggerFactory.getLogger(DynamicJob.class);

    /**
     * 核心方法,Quartz Job真正的执行逻辑.
     *   executorContext JobExecutionContext中封装有Quartz运行所需要的所有信息
     * @throws JobExecutionException execute()方法只允许抛出JobExecutionException异常
     */
    @Override
    public void execute(JobExecutionContext executionContext) throws JobExecutionException {

        JobDataMap map = executionContext.getMergedJobDataMap();

        String jarPath = map.getString("jarPath");
        String parameter = map.getString("parameter");
        String vmParam = map.getString("vmParam");
        logger.info("Running Job name : {} ", map.getString("name"));
        logger.info("Running Job description : " + map.getString("JobDescription"));
        logger.info("Running Job group: {} ", map.getString("group"));
        logger.info("Running Job cron : " + map.getString("cronExpression"));
        logger.info("Running Job jar path : {} ", jarPath);
        logger.info("Running Job parameter : {} ", parameter);
        logger.info("Running Job vmParam : {} ", vmParam);

        long startTime = System.currentTimeMillis();
        if (!StringUtils.getStringUtil.isEmpty(jarPath)) {

            File jar = new File(jarPath);
            if (jar.exists()) {
                ProcessBuilder processBuilder = new ProcessBuilder();
                processBuilder.directory(jar.getParentFile());
                /**
                 * 这个是java的执行命令
                 * java -jar
                 */
                List<String> commands = new ArrayList<>();
                commands.add("java");
                if (!StringUtils.getStringUtil.isEmpty(vmParam)) {
                    commands.add(vmParam);
                }
                commands.add("-jar");
                commands.add(jarPath);
                commands.add(" &");
                System.out.println("commands->\n"+commands);

                if (!StringUtils.getStringUtil.isEmpty(parameter)) {

                    commands.add(parameter);
                    processBuilder.command(commands);
                    logger.info("Running Job details as follows >>>>>>>>>>>>>>>>>>>>: ");
                    logger.info("Running Job commands : {}  ", StringUtils.getStringUtil.getListString(commands));
                    try {
                        Process process = processBuilder.start();
                        logProcess(process.getInputStream(), process.getErrorStream());

                    } catch (Exception e) {

                        e.printStackTrace();
                    }
                } else {

                    throw new JobExecutionException("Job Jar not found >>  " + jarPath);
                }

                long endTime = System.currentTimeMillis();
                logger.info(">>>>>>>>>>>>> Running Job has been completed , cost time :  " + (endTime - startTime) + "ms\n");
            }


        }


    }

    //打印Job执行内容的日志
    private void logProcess(InputStream inputStream, InputStream errorStream) throws IOException {
        String inputLine;
        String errorLine;
        BufferedReader inputReader = new BufferedReader(new InputStreamReader(inputStream));
        BufferedReader errorReader = new BufferedReader(new InputStreamReader(errorStream));
        while ((inputLine = inputReader.readLine()) != null){
            logger.info(inputLine);
        }
        while ((errorLine = errorReader.readLine()) != null) {
            logger.error(errorLine);
        }
    }

}

5.项目git地址

(喜欢记得点星支持哦,谢谢!)

https://github.com/fengcharly/quartz-jpa

以上是关于怎样获得在yarn框架上运行jar包的执行结果的主要内容,如果未能解决你的问题,请参考以下文章

怎样获得在yarn框架上运行jar包的执行结果

Quartz定时调度jar包的执行Demo分享

怎样把jar包放到linux下运行

下载了lucene 3.0的jar包后,怎样运行? 刚接触lucene,求高手指点

jar 启动 springboot 怎样输入网址

Linux Shell 怎样获得命令的执行结果