从序列文件 RecordReader 返回的错误字节数组

Posted

技术标签:

【中文标题】从序列文件 RecordReader 返回的错误字节数组【英文标题】:Wrong ByteArray Returned From SequenceFileRecordReader 【发布时间】:2013-11-17 14:55:12 【问题描述】:

我正在尝试读取存储在 HDFS 序列文件中的二进制网络数据包数据。问题似乎出在不可打印字符或数据包标头中。

目前,数据通过自定义插件(去除了全局 tcpdump 标头的 jpcap)使用 Flume-ng (1.4) 存储到 HDFS 中。事件是基于每个数据包提交的。这是我使用 SequenceFile RecordReader 将其读回 Pig 的进一步方式。

现在,为了简单起见,除了从 seq 文件中读取记录并将其直接写入文件 (output.pcap) 之外,我什么也不做。

input.pcap 摘录(从 HDFS 检索):

00000ab0  00 00 08 00 00 01 42 5c  4a e1 e9 00 00 00 00 00  |......B\J.......|
00000ac0  00 01 cf 00 00 00 08 00  00 01 42 5c 4a e1 ea 00  |..........B\J...|
00000ad0  00 01 c3 47 45 54 20 2f  20 48 54 54 50 2f 31 2e  |...GET / HTTP/1.|
00000ae0  31 0d 0a 48 6f 73 74 3a  20 31 39 32 2e 31 36 38  |1..Host: 192.168|
00000af0  2e 31 30 39 2e 31 32 38  0d 0a 41 63 63 65 70 74  |.109.128..Accept|

output.pcap 相同的批量摘录(来自 Pig UDF):

000009a0  fa 83 31 5d 7d da 1e a0  b7 32 4f 50 65 ab 61 28  |..1]....2OPe.a(|
000009b0  b1 ee 2b 6d 22 74 d9 64  bf 8d 60 23 62 a9 c5 ac  |..+m"t.d..`#b...|
000009c0  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |................|
*
00000a40  00 00 01 c3 47 45 54 20  2f 20 48 54 54 50 2f 31  |....GET / HTTP/1|
00000a50  2e 31 0d 0a 48 6f 73 74  3a 20 31 39 32 2e 31 36  |.1..Host: 192.16|
00000a60  38 2e 31 30 39 2e 31 32  38 0d 0a 41 63 63 65 70  |8.109.128..Accep|

如您所见,第一个十六进制转储显示 0x01425c4ae2ee,它转换为时间戳:1384527880942,或格林威治标准时间 2013 年 11 月 15 日星期五 15:04:40。另一个只显示 nills,直到数据包数据的开始。

希望有人可以在这里指出正确的方向,以便我可以读出数据包头,例如:

c2 ba cd 4f b6 35 0f 00  36 00 00 00 36 00 00 00

1-4b:   Timestamp, 0x4fcdbac2. 
calc 0x4fcdbac2 -> 1338882754
-> date --date='1970-01-01 1338882754 sec GMT’

5-8b:   Microseconds of timestamp

9-12b:  Packet data size

13-16b: Length of packet as it was captured on the wire (54b). Can be the same as 9-12b but can be different if snapshot length (max packet 
    length) is less than 65536

不用多介绍,这里是PcapFileLoader.java:

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.krakenapps.pcap.util.ByteOrderConverter;

public class PcapFileLoader extends FileInputLoadFunc 

    private SequenceFileRecordReader<LongWritable, BytesWritable> reader;

    private Writable key;
    private BytesWritable value;

    private ArrayList<Object> mProtoTuple = null;

    protected TupleFactory mTupleFactory = TupleFactory.getInstance();
    protected SerializationFactory serializationFactory;

    protected byte[] currentPacket;

    protected byte keyType = DataType.UNKNOWN;
    protected byte valType = DataType.UNKNOWN;

    public PcapFileLoader() 
        mProtoTuple = new ArrayList<Object>(2);
    

    protected void setKeyType(Class<?> keyClass) throws BackendException 
        this.keyType |= inferPigDataType(keyClass);
        if (keyType == DataType.ERROR) 
            throw new BackendException("Unable to translate " + key.getClass() + " to a Pig datatype");
        
    

    protected void setValueType(Class<?> valueClass) throws BackendException 
        this.valType |= inferPigDataType(valueClass);
        if (keyType == DataType.ERROR) 
            throw new BackendException("Unable to translate " + key.getClass() + " to a Pig datatype");
        
    

    @Override
    public Tuple getNext() throws IOException 
        boolean next = false;
        try 
            next = reader.nextKeyValue();
         catch (InterruptedException e) 
            throw new IOException(e);
        

        if (!next) 
            return null;
        

        key = reader.getCurrentKey();
        value = reader.getCurrentValue();

        currentPacket = value.getBytes();
        if (keyType == DataType.UNKNOWN && key != null) 
            setKeyType(key.getClass());
        
        if (valType == DataType.UNKNOWN && value != null) 
            setValueType(value.getClass());
        

        //readPacketHeader();
        ByteBuffer buffer = ByteBuffer.wrap(currentPacket);
        long ts = buffer.getLong();
        ts = ByteOrderConverter.swap(ts);
        System.out.println(ts);

        FileOutputStream file = new FileOutputStream(new File("output.pcap"),true);
        file.write(value.getBytes());
        file.close();

        mProtoTuple.add(translateWritableToPigDataType(key, keyType));
        mProtoTuple.add(translateWritableToPigDataType(value, valType));
        Tuple t = mTupleFactory.newTuple(mProtoTuple);
        mProtoTuple.clear();
        return t;
    

    protected byte inferPigDataType(Type t) 
        if (t == DataByteArray.class) 
            return DataType.BYTEARRAY;
         else if (t == BytesWritable.class) 
            return DataType.BYTEARRAY;
         else if (t == Text.class) 
            return DataType.CHARARRAY;
         else if (t == IntWritable.class) 
            return DataType.INTEGER;
         else if (t == LongWritable.class) 
            return DataType.LONG;
         else if (t == FloatWritable.class) 
            return DataType.FLOAT;
         else if (t == DoubleWritable.class) 
            return DataType.DOUBLE;
         else if (t == BooleanWritable.class) 
            return DataType.BOOLEAN;
         else if (t == ByteWritable.class) 
            return DataType.BYTE;
         // not doing maps or other complex types for now
        else 
            return DataType.ERROR;
        
    

    protected Object translateWritableToPigDataType(Writable w, byte dataType) 
        switch (dataType) 
            case DataType.CHARARRAY:
                return ((Text) w).toString();
            case DataType.BYTEARRAY:
                return (w instanceof BytesWritable ? new DataByteArray(((BytesWritable) w).getBytes()) : w);
            case DataType.BOOLEAN:
                return ((BooleanWritable) w).get();
            case DataType.INTEGER:
                return ((IntWritable) w).get();
            case DataType.LONG:
                return ((LongWritable) w).get();
            case DataType.FLOAT:
                return ((FloatWritable) w).get();
            case DataType.DOUBLE:
                return ((DoubleWritable) w).get();
            case DataType.BYTE:
                return ((ByteWritable) w).get();
        

        return null;
    

    @SuppressWarnings("unchecked")
    @Override
    public InputFormat getInputFormat() throws IOException 
        return new SequenceFileInputFormat<LongWritable, BytesWritable>();
    

    @SuppressWarnings("unchecked")
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split)
            throws IOException 
        this.reader = (SequenceFileRecordReader) reader;
    

    @Override
    public void setLocation(String location, Job job) throws IOException 
        FileInputFormat.setInputPaths(job, location);
    

例如可以通过 pig 脚本调用:

%DEFAULT includepath includes.pig
RUN $includepath;

seq = LOAD 'good.newest.pcap' using PcapFileLoader() as (a: long, b: bytearray);

DUMP seq;

谢谢!

【问题讨论】:

【参考方案1】:

在这种情况下,问题在于理解数据集。通常会(我见过的那些解决方案)开始分析 IP 层,而 JPCAP 也提供以太网标头。因此,IP 层在链中排名第二。最后,我还发现了一些隐藏的好文档 here。

【讨论】:

以上是关于从序列文件 RecordReader 返回的错误字节数组的主要内容,如果未能解决你的问题,请参考以下文章

转载Hadoop自定义RecordReader

为啥 Apache Orc RecordReader.searchArgument() 没有正确过滤?

为啥我会收到“错误:序列化从 getStaticProps 返回的 ___ 时出错”?

JavaScript中的match方法和search方法

使用 Powershell 解析从 Web 请求返回的 JSON 字节流而不写入文件

如何高效的使用IO流?字节流字符流缓冲流序列化对象流打印流全整理