使用 Hbase 自定义过滤器时出现异常

Posted

技术标签:

【中文标题】使用 Hbase 自定义过滤器时出现异常【英文标题】:Exception while using Hbase custom filter 【发布时间】:2017-09-01 05:10:12 【问题描述】:

我编写了一个扩展FilterBase 的 Hbase 自定义过滤器并转换为 JAR。过滤器如下所示:

public class MyFilter1 extends FilterBase implements Serializable
boolean filterRow= true;
String srh;

public MyFilter1(String str) 
    this.srh= str;


@Override
public ReturnCode filterKeyValue(Cell c) throws IOException 
    String str= Bytes.toString(c.getValue());

    if(str.contains(str)) 
        filterRow= false;
        return ReturnCode.INCLUDE;
    

    filterRow= true;
    return ReturnCode.SKIP;


@Override
public  boolean filterRow() 
    return filterRow;


@Override
public byte[] toByteArray() throws IOException 
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    ObjectOutputStream os = new ObjectOutputStream(out);
    os.writeObject(this);
    return out.toByteArray();


public static MyFilter1 parseFrom(final byte[] data) 
    ByteArrayInputStream in = new ByteArrayInputStream(data);
    MyFilter1 ans= null;
    ObjectInputStream is;
    try 
        is = new ObjectInputStream(in);
        ans= (MyFilter1)is.readObject();;
     catch (Exception e) 
        e.printStackTrace();
    
    return ans;

制作 JAR 文件(即 MyFilter.jar)后,我将其放在 /use/local/HBase/lib/filters 目录中。然后我设置

导出 HBASE_CLASSPATH="/usr/local/Hbase/lib/filters/MyFilter.jar"

在 hbase-env.sh 过滤并重新启动 hbase 服务器。然后我使用了 java 程序中的自定义过滤器:

public static void main(String argv[]) throws IOException 
        Configuration conf= HBaseConfiguration.create();
        Connection con= ConnectionFactory.createConnection(conf);

        Table table= con.getTable(TableName.valueOf("stud"));

        Filter fl= new MyFilter("uc");

        Scan sc= new Scan();
        sc.setFilter(fl);

        ResultScanner rs= table.getScanner(sc);

        for(Result r : rs)
            System.out.println(Bytes.toString(r.getValue(Bytes.toBytes("perData"), Bytes.toBytes("name"))));
    

但是得到以下异常

Exception in thread "main" org.apache.hadoop.hbase.DoNotRetryIOException: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1478)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:993)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2396)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33648)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2180)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1474)
... 8 more
Caused by: org.apache.hadoop.hbase.exceptions.DeserializationException: parseFrom called on base Filter, but should be called on derived type
at org.apache.hadoop.hbase.filter.Filter.parseFrom(Filter.java:270)
... 13 more

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:329)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:408)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:65)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:210)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:364)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:338)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:65)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.DoNotRetryIOException): org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1478)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:993)
at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2396)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33648)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2180)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toFilter(ProtobufUtil.java:1474)
... 8 more
Caused by: org.apache.hadoop.hbase.exceptions.DeserializationException: parseFrom called on base Filter, but should be called on derived type
at org.apache.hadoop.hbase.filter.Filter.parseFrom(Filter.java:270)
... 13 more

at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1267)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:34094)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:400)
... 10 more

谁能帮帮我...

【问题讨论】:

【参考方案1】:

只是为上面 Abhishek Kumar 的答案添加一个(也许很重要的)细节。看来序列化器toByteArray() 和反序列化器parseFrom(byte[] rawBytes) 必须通过Google Protocol Buffers 2 实现。下面是一个示例实现。

AFilter.java

// whatever fields you need for the AFilter
long fieldA;
long fieldB;   

/**
* Transform this @code AFilter instance to a byte array for serialization.  
* @return raw bytes of this instance
*/
@Override
public byte[] toByteArray() 
    final FilterProtos.AFilter.Builder builder = FilterProtos.AFilter.newBuilder();
    builder.setFieldA(fieldA);
    builder.setFieldB(fieldB);

    return builder.build().toByteArray();


/**
 * De-serialize @code AFilter from @code rawBytes.
 *
 * @param rawBytes raw bytes of the filter
 * @return AFilter object
 * @throws DeserializationException
 */
public static AFilter parseFrom(final byte[] rawBytes)
        throws DeserializationException 

    try 
        FilterProtos.AFilter proto;
        proto = FilterProtos.AFilter.parseFrom(rawBytes);

        return new AFilter(proto.getFieldA(), proto.getFieldB());
     catch (InvalidProtocolBufferException ex) 
        throw new DeserializationException(
                ex);
    

Filters.proto

option java_package = "my.java.package";
option java_outer_classname = "FilterProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message AFilter
    required uint64 fieldA = 1;
    required uint64 fieldB = 2;

【讨论】:

【参考方案2】:

正如您在异常堆栈中看到的,这是由于“parseFrom 在基本过滤器上调用,但应该在派生类型上调用”引起的。

这意味着您还必须在自定义过滤器类中实现parseFrom

此外,您可能需要实现toByteArray 以及它们的结合使用。

【讨论】:

感谢您的回答。我是 hbase 的新手。有没有关于如何实现这些方法的教程或指南? 有什么例子比派生类 FilterBase.For example 的 HBase 库过滤器更好。 github.com/apache/hbase/blob/master/hbase-client/src/main/java/… 我已经实现了 parseFrom 和 to ByteArray(请参阅已编辑的帖子)....但仍然遇到同样的错误

以上是关于使用 Hbase 自定义过滤器时出现异常的主要内容,如果未能解决你的问题,请参考以下文章

使用 TestRestTemplate 时出现异常

Django:在使用模板继承时在基本模板文件中加载自定义过滤器时出现问题

在自定义适配器上使用.getFilter()时出现问题(未正确过滤)(Android)

按分区过滤 system.parts (ClickHouse) 时出现异常

hbase 自定义过滤器

职位画像中phoenix链接HBase异常之版本不匹配