如何将小 ORC 文件合并或合并为更大的 ORC 文件?

Posted

技术标签:

【中文标题】如何将小 ORC 文件合并或合并为更大的 ORC 文件?【英文标题】:How do I Combine or Merge Small ORC files into Larger ORC file? 【发布时间】:2018-10-07 02:35:55 【问题描述】:

SO 和网络上的大多数问题/答案都在讨论使用 Hive 将一堆小的 ORC 文件组合成一个更大的文件,但是,我的 ORC 文件是按天分隔的日志文件,我需要将它们分开。我只想每天“汇总”ORC 文件(它们是 HDFS 中的目录)。

我很可能需要用 Java 编写解决方案,并且遇到过OrcFileMergeOperator,这可能是我需要使用的,但现在说还为时过早。

解决此问题的最佳方法是什么?

【问题讨论】:

【参考方案1】:

这是一个使用PyORC 将小的ORC 文件连接在一起的Python 小脚本。我知道它不会直接回答您的问题,因为它不是在 Java 中,但我发现它比当前的解决方案或使用 Hive 更简单。

import pyorc
import argparse


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
    parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
    args = parser.parse_args()

    schema = str(pyorc.Reader(args.files[0]).schema)

    with pyorc.Writer(args.output, schema) as writer:
        for i, f in enumerate(args.files):
            reader = pyorc.Reader(f)
            if str(reader.schema) != schema:
                raise RuntimeError(
                    "Inconsistent ORC schemas.\n"
                    "\tFirst file schema: \n"
                    "\tFile # schema: "
                    .format(schema, i, str(reader.schema))
                )
            for line in reader:
                writer.write(line)


if __name__ == '__main__':
    main()

【讨论】:

【参考方案2】:

您无需重新发明***。

ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE 可用于将小的 ORC 文件合并到更大的文件中,因为Hive 0.14.0. 合并发生在条带级别,这避免了对数据进行解压缩和解码。它工作得很快。我建议创建一个按天分区的外部表(分区是目录),然后将它们全部合并,指定PARTITION (day_column) 作为分区规范。

请看这里:LanguageManual+ORC

【讨论】:

不幸的是,我没有“day_column”,因为数据是“journald -o json”的输出,它被转换为 ORC 并存储在 HDFS 的外部表中,目录结构为 yyyy/ mm/dd/file1.orc file2.orc file3.orc等。所有时间戳都以纪元时间为准。 @ChrisC 但是您可以在数据根文件夹顶部创建带有附加分区列的新外部表,然后将分区挂载到 yyyy/mm/dd 位置,然后使用 hive 连接所有分区。 1)通过load_day创建外部表+分区。 2)更改表添加分区等以将它们全部挂载。 3) 使用 Hive 连接 CONCATENATE 也适用于外部表吗?我知道它没有。 @OmarAli 当然,它有效。外部或管理,没关系。外部和托管之间的唯一区别是 DROP 表行为。托管表 DROP 也会删除数据。外部表删除将仅删除表定义。您还可以一次在 HDFS 的同一目录上创建几个不同的表。 @FoxanNg 是的,它是在issues.apache.org/jira/browse/HIVE-17403Hive 的 3.0.0、2.4.0 版本中添加的【参考方案3】:

这里有很好的答案,但没有一个允许我运行 cron 作业以便我可以每天汇总。我们每天都将日志文件写入 HDFS,我不想每天进来时都在 Hive 中运行查询。

我最终做的事情对我来说似乎更直接。我编写了一个 Java 程序,它使用 ORC 库扫描目录中的所有文件并创建这些文件的列表。然后打开一个新的 Writer,它是“组合”文件(以“.”开头,因此它对 Hive 隐藏,否则 Hive 将失败)。然后程序打开列表中的每个文件并读取内容并写出组合文件。读取所有文件后,它会删除文件。我还添加了一次运行一个目录的功能,以备不时之需。

注意:您将需要一个架构文件。 Journald 日志可以以 json "journalctl -o json" 的形式输出,然后您可以使用 Apache ORC 工具生成模式文件,也可以手动生成。 ORC 的自动生成很好,但手动总是更好。

注意:要按原样使用此代码,您需要一个有效的密钥表并在类路径中添加 -Dkeytab=。

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp 

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException 

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) 

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) 
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try 
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
         catch (Exception e) 
          continue;
        
       else 
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try 
          orcFiles = getAllFilePath(argsPath, fileSystem);
         catch (Exception e) 
          continue;
        
      

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) 
        System.out.println(outFilePath + " exists, delete before continuing.");
       else 
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      

      for(int j = 0; j < orcFiles.size(); j++ )  
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) 
          if (batch != null) 
             writer.addRowBatch(batch);
          
        
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) 
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    
  

  private static String getSchema(String resource) throws IOException 
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) 
      return IOUtils.toString(input, UTF_8);
    
  

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException 
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) 
      hostsList.add(fileStat.getPath().toString());
    
    return hostsList;
  

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException 
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) 
      if (fileStat.isDirectory()) 
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
       else 
        fileList.add(fileStat.getPath()
                             .toString());
      
    
    for(int i = 0; i< fileList.size(); i++) 
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    

    return fileList;
  


【讨论】:

说真的,有人否决了我自己的解决方案?然后提供一个更好的选择,不要只是投反对票并继续前进。 如果不需要,您是否知道如何禁用身份验证 (Kerberos)? @Ryan 你知道如何禁用身份验证(kerberos)吗? @Riddle 我有一个可以分享的代码库。你能提醒我星期二吗?现在就打倒新冠病毒。 @Riddle 让我知道这个 sn-p 是否对您有帮助 goonlinetools.com/snapshot/code/#ktak5lzugmoyapeyipuy8e

以上是关于如何将小 ORC 文件合并或合并为更大的 ORC 文件?的主要内容,如果未能解决你的问题,请参考以下文章

通过 Databricks 笔记本更改表表名 CONCATENATE 错误

如何在写入hive orc表时合并spark中的小文件

在 Delphi 中是不是可以将枚举合并为更大的枚举?

平面文件(orc,csv)比火花中的增量表更有效吗

Databricks 更新表不适用于 orc 格式

如何合并 sparksql 保存在 hive 上的小文件?