Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
Posted yy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务相关的知识,希望对你有一定的参考价值。
hadoop api提供了一些遍历文件的api,通过该api可以实现遍历文件目录:
import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class BatchSubmitMain { public static void main(String[] args) throws Exception { String mrTableName = args[0]; String fglibTableName = args[1]; Configuration conf = new Configuration(); /* * <property> <name>fs.defaultFS</name> <value>hdfs://hcluster</value> * </property> */ conf.set("fs.defaultFS", "hdfs://hcluster"); FileSystem fileSystem = FileSystem.get(conf); String mrFilePath = "/myuser/hivedb/" + mrTableName; String fglibFilePath = "/myuser/hivedb/" + fglibTableName; System.out.println(mrFilePath); List<String> mrObjectIdItems = getObjectIdItems(fileSystem, mrFilePath); System.out.println(fglibFilePath); List<String> fglibObjectIdItems = getObjectIdItems(fileSystem, fglibFilePath); List<String> objectIdItems = new ArrayList<>(); for (String mrObjectId : mrObjectIdItems) { for (String fglibObjectId : fglibObjectIdItems) { if (mrObjectId == fglibObjectId) { objectIdItems.add(mrObjectId); } } } String submitShPath = "/app/myaccount/service/submitsparkjob.sh"; CountDownLatch threadSignal = new CountDownLatch(objectIdItems.size()); for (int ii = 0; ii < objectIdItems.size(); ii++) { String objectId = objectIdItems.get(ii); Thread thread = new ImportThread(objectId, submitShPath, threadSignal); thread.start(); } threadSignal.await(); System.out.println(Thread.currentThread().getName() + "complete"); } private static List<String> getObjectIdItems(FileSystem fileSystem, String filePath) throws FileNotFoundException, IOException { List<String> objectItems = new ArrayList<>(); Path path = new Path(filePath); // 获取文件列表 FileStatus[] files = fileSystem.listStatus(path); // 展示文件信息 for (int i = 0; i < files.length; i++) { try { if (files[i].isDirectory()) { String[] fileItems = files[i].getPath().getName().split("/"); String objectId = fileItems[fileItems.length - 1].replace("objectid=", ""); objectItems.add(objectId); System.out.println(objectId); } } catch (Exception e) { e.printStackTrace(); } } return objectItems; } /** * @param hdfs * FileSystem 对象 * @param path * 文件路径 */ public static void iteratorShowFiles(FileSystem hdfs, Path path) { try { if (hdfs == null || path == null) { return; } // 获取文件列表 FileStatus[] files = hdfs.listStatus(path); // 展示文件信息 for (int i = 0; i < files.length; i++) { try { if (files[i].isDirectory()) { System.out.print(">>>" + files[i].getPath() + ", dir owner:" + files[i].getOwner()); // 递归调用 iteratorShowFiles(hdfs, files[i].getPath()); } else if (files[i].isFile()) { System.out.print(" " + files[i].getPath() + ",length:" + files[i].getLen() + ", owner:" + files[i].getOwner()); } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } }
并行执行sh的线程:
import java.util.concurrent.CountDownLatch; public class ImportThread extends Thread { private final JavaShellInvoker javaShellInvoker = new JavaShellInvoker(); private CountDownLatch countDownLatch; private String objectId; private String submitShPath; public ImportThread(String objectId, String submitShPath, CountDownLatch countDownLatch) { this.objectId = objectId; this.submitShPath = submitShPath; this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "start... " + this.submitShPath + " " + this.objectId.toString());// 打印开始标记 try { int result = this.javaShellInvoker.executeShell("mrraster", this.submitShPath, this.objectId); if (result != 0) { System.out.println(Thread.currentThread().getName() + " result type is error"); } } catch (Exception e) { e.printStackTrace(); System.out.println(Thread.currentThread().getName() + "-error:" + e.getMessage()); } this.countDownLatch.countDown();// 计时器减1 System.out.println(Thread.currentThread().getName() + " complete,last " + this.countDownLatch.getCount() + " threads");// 打印结束标记 } }
执行sh的java代码:
import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; public class JavaShellInvoker { private static final String executeShellLogFile = "./executeShell_%s_%s.log"; public int executeShell(String shellCommandType, String shellCommand, String args) throws Exception { int success = 0; args = (args == null) ? "" : args; String now = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File logFile = new File(String.format(executeShellLogFile, shellCommandType, now)); ProcessBuilder pb = new ProcessBuilder("sh", shellCommand, args); pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile)); pb.redirectError(ProcessBuilder.Redirect.appendTo(logFile)); Process pid = null; try { pid = pb.start(); success = pid.waitFor(); } catch (Exception ex) { success = 2; System.out.println("executeShell-error:" + ex.getMessage()); throw ex; } finally { if (pid.isAlive()) { success = pid.exitValue(); pid.destroy(); } } return success; } }
submitsparkjob.sh
#!/bin/sh source ../login.sh spark-submit --master yarn-cluster --class MySparkJobMainClass --driver-class-path /app/myaccount/service/jars/ojdbc7.jar --jars /app/myaccount/service/jars/ojdbc7.jar --num-executors 20 --driver-memory 6g --executor-cores 1 --executor-memory 8g MySparkJobJar.jar $1
执行BatchSubmit.jar的命令:
hadoop jar BatchSubmit.jar
以上是关于Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务的主要内容,如果未能解决你的问题,请参考以下文章
powershell 这将遍历目录中的文件列表,解析文件的名称,并根据解析的名称创建新文件。氏