将 Parquet 数据加载到 PIG 时如何避免 UnsatisfiedLinkError

Posted

技术标签:

【中文标题】将 Parquet 数据加载到 PIG 时如何避免 UnsatisfiedLinkError【英文标题】:How to avoid UnsatisfiedLinkError when loading Parquet data into PIG 【发布时间】:2016-08-19 16:44:38 【问题描述】:

我正在尝试使用org.apache.parquet.pig.ParquetLoader()parquet-pig-bundle-1.8.1.jar 和pig 版本0.15.0.2.4.2.0-258 将parquet 数据加载到pig 脚本中。我的脚本是一个非常简单的加载和转储,以确保一切正常。

我的脚本是:

register 'parquet-pig-bundle-1.8.1.jar';
dat = LOAD '/project/part-r-00075.parquet'
    USING org.apache.parquet.pig.ParquetLoader();

dat_limited = LIMIT dat 5;

DUMP dat_limited;

但是,当我运行它时,我得到:

2016-08-19 12:38:01,536 [main] ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2998: Unhandled internal error. org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
Details at logfile: /devel/mrp/pig/ttfs3_examples/pig_1471624672895.log
2016-08-19 12:38:01,581 [main] INFO  org.apache.pig.Main - Pig script completed in 9 seconds and 32 milliseconds (9032 ms)
Aug 19, 2016 12:37:57 PM INFO: org.apache.parquet.hadoop.ParquetInputFormat: Total input paths to process : 1
Aug 19, 2016 12:37:57 PM INFO: org.apache.parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5
Aug 19, 2016 12:37:57 PM INFO: org.apache.parquet.hadoop.ParquetFileReader: reading another 1 footers
Aug 19, 2016 12:37:57 PM INFO: org.apache.parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5
Aug 19, 2016 12:37:58 PM INFO: org.apache.parquet.hadoop.ParquetInputFormat: Total input paths to process : 1
Aug 19, 2016 12:37:59 PM INFO: org.apache.parquet.hadoop.ParquetInputFormat: Total input paths to process : 1
Aug 19, 2016 12:37:59 PM WARNING: org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Aug 19, 2016 12:37:59 PM INFO: org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 64797 records.
Aug 19, 2016 12:37:59 PM INFO: org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
Aug 19, 2016 12:38:01 PM INFO: org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory in 1244 ms. row count = 63113
2016-08-19 12:38:01,832 [Thread-0] ERROR org.apache.hadoop.hdfs.DFSClient - Failed to close inode 457368033
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/temp-1982281463/tmp1114763885/_temporary/0/_temporary/attempt__0001_m_000001_1/part-m-00001 (inode 457368033): File does not exist. Holder DFSClient_NONMAPREDUCE_-797544746_1 does not have any open files.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3481)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3571)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3538)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:884)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:544)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2206)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2202)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1709)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2200)

    at org.apache.hadoop.ipc.Client.call(Client.java:1426)
    at org.apache.hadoop.ipc.Client.call(Client.java:1363)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.complete(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
    at com.sun.proxy.$Proxy13.complete(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2354)
    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2336)
    at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2300)
    at org.apache.hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java:951)
    at org.apache.hadoop.hdfs.DFSClient.closeOutputStreams(DFSClient.java:983)
    at org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:1134)
    at org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:2744)
    at org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer.run(FileSystem.java:2761)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

日志有:

Pig Stack Trace
---------------
ERROR 2998: Unhandled internal error. org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I

java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
    at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
    at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
    at org.apache.parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
    at org.apache.parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at org.apache.parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
    at org.apache.parquet.column.impl.ColumnReaderImpl.readPageV1(ColumnReaderImpl.java:591)
    at org.apache.parquet.column.impl.ColumnReaderImpl.access$300(ColumnReaderImpl.java:60)
    at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:540)
    at org.apache.parquet.column.impl.ColumnReaderImpl$3.visit(ColumnReaderImpl.java:537)
    at org.apache.parquet.column.page.DataPageV1.accept(DataPageV1.java:96)
    at org.apache.parquet.column.impl.ColumnReaderImpl.readPage(ColumnReaderImpl.java:537)
    at org.apache.parquet.column.impl.ColumnReaderImpl.checkRead(ColumnReaderImpl.java:529)
    at org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:641)
    at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:357)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
    at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
    at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:135)
    at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:101)
    at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
    at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:101)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:214)
    at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
    at org.apache.parquet.pig.ParquetLoader.getNext(ParquetLoader.java:230)
    at org.apache.pig.impl.io.ReadToEndLoader.getNextHelper(ReadToEndLoader.java:251)
    at org.apache.pig.impl.io.ReadToEndLoader.getNext(ReadToEndLoader.java:231)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad.getNextTuple(POLoad.java:137)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:307)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit.getNextTuple(POLimit.java:122)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:307)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore.getNextTuple(POStore.java:159)
    at org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher.runPipeline(FetchLauncher.java:157)
    at org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher.launchPig(FetchLauncher.java:81)
    at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.launchPig(HExecutionEngine.java:302)
    at org.apache.pig.PigServer.launchPlan(PigServer.java:1431)
    at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:1416)
    at org.apache.pig.PigServer.storeEx(PigServer.java:1075)
    at org.apache.pig.PigServer.store(PigServer.java:1038)
    at org.apache.pig.PigServer.openIterator(PigServer.java:951)
    at org.apache.pig.tools.grunt.GruntParser.processDump(GruntParser.java:754)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:376)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
    at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:81)
    at org.apache.pig.Main.run(Main.java:631)
    at org.apache.pig.Main.main(Main.java:177)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
================================================================================

我检查了ParquetLoader 的来源,并且该方法似乎有一个没有参数的有效签名。我还尝试添加其他几个似乎没有与parquet-pig-bundle 打包的依赖项,例如parquet-commonparquet-encoding,但均未成功。

【问题讨论】:

【参考方案1】:

这里的问题是 hadoop 和 pig 在 snappy 的版本上存在分歧。正在使用 hadoop 中提供的旧版本 snappy。当我将export HADOOP_USER_CLASSPATH_FIRST=true 添加到我的~/.bashrc 时,问题就消失了。

【讨论】:

以上是关于将 Parquet 数据加载到 PIG 时如何避免 UnsatisfiedLinkError的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 PIG 将数据从本地系统加载到 hdfs

使用 spark 将 parquet 数据从 Google 云存储加载到 BigQuery

无法将数据加载到 Pig 中的 Hortonworks Sandbox

使用管道连字符管道将数据加载到 Pig |-|分隔符

我们可以直接将 Parquet 文件加载到 Hive 中吗?

如何将 CSV/TSV 文件从 Pig 加载/导出到 Pandas?