Flink写入Hbase

Posted 一个肉团子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink写入Hbase相关的知识,希望对你有一定的参考价值。

1、在构建实时数仓的时候,通常会把dim层的数据存入hbase,这样做的好处一个是利用hbase的幂等性的功能,维度表基本上都会有数据唯一性,第二个在实时性要求的场景下,可以做数据点查关联,效率上得到一定的保障。

部分sink代码如下:

private transient Connection hbaseConnection;

    private transient Connection hbaseConnection;

    private Admin hbaseAdmin;

    private Table driBaseInfoResTable;


    @Override
    public void open(Configuration parameters) throws Exception 
        super.open(parameters);

        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();

        configuration.set("hbase.zookeeper.quorum", "192.168.11.1,192.168.11.2,192.168.11.3");

        hbaseConnection = ConnectionFactory.createConnection(configuration);

        hbaseAdmin = hbaseConnection.getAdmin();


        if (!hbaseAdmin.tableExists(TableName.valueOf("test:dim_dri_base_info"))) 
            log.error("hbase table not exists: ", "test:dim_dri_base_info");
        

    


    /**
     * @param value
     * @return 返回的是 把数据打平的一条元祖数据 Tuple6<String,Integer, Integer, Integer,Integer,Integer>
     */
    @Override
    public Tuple6<String, Integer, Integer, Integer, Integer, Long> map(UpOrderInfo value) throws Exception 


        Tuple6 tuple6 = new Tuple6<String, Integer, Integer, Integer, Integer, Long>();
        Integer idCardTaurus = -100;
        Integer vehTaurus = -100;
        String time = value.getUpDataTime();

        byte[] opDriverUuid = value.getDriverUuid().concat("_op").getBytes();

        try 

            Get getResOp = new Get(opDriverUuid);

            getResOp.addFamily("CF1".getBytes());

            Result resultResOp = driResOPTable.get(getResOp);

            List<Cell> columnResOpCells = resultResOp.getColumnCells("CF1".getBytes(), "res_op".getBytes());

            String resUuid = "";

            for (Cell cell : columnResOpCells) 
                resUuid = Bytes.toString(CellUtil.cloneValue(cell));
            

            Get driTaurus = new Get(resUuid.getBytes());

            driTaurus.addFamily("CF1".getBytes());


            Result resultBaseInfo = driBaseInfoResTable.get(driTaurus);

            List<Cell> columnTaurusStatusCells = resultBaseInfo.getColumnCells("CF1".getBytes(), "driver_taurus_status".getBytes());

            for (Cell cell : columnTaurusStatusCells) 
                value.setIdCardTaurusStatus(Integer.parseInt(Bytes.toString(CellUtil.cloneValue(cell))));
            

            Get getVehNo = new Get(Utils.reverse(value.getVehicleNo()).getBytes());

            getVehNo.addFamily("CF1".getBytes());
            Result resultVehNo = vehStatusTable.get(getVehNo);

            List<Cell> columnVehNoCells = resultVehNo.getColumnCells("CF1".getBytes(), "taurus_status".getBytes());

            for (Cell cell : columnVehNoCells) 
                value.setVehicleTaurusStatus(Integer.parseInt(Bytes.toString(CellUtil.cloneValue(cell))));
            

            List<Cell> columnLocalVehNoCells = resultVehNo.getColumnCells("CF1".getBytes(), "status_".getBytes());

            for (Cell cell : columnLocalVehNoCells) 
                value.setVehicleGovStatus(Integer.parseInt(Bytes.toString(CellUtil.cloneValue(cell))));
            


            idCardTaurus = value.getIdCardTaurusStatus() != null ? value.getIdCardTaurusStatus() : -100;

            vehTaurus = value.getVehicleTaurusStatus() != null ? value.getVehicleTaurusStatus() : -100;


         catch (Exception e) 
            log.error("操作hbase出的错误为:" + e);
        

        try 
            tuple6.setFields(time, value.getUploadStatus(), value.getIsIntercept(), idCardTaurus, vehTaurus, 1L);
         catch (Exception e) 
            log.error("错误日志为:" + e);
            log.error("错误的数据为:" + value.toString());
        

        return tuple6;
    


    @Override
    public void close() throws Exception 
      

        if (driBaseInfoResTable != null) 
            driBaseInfoResTable.close();
        

        if (hbaseAdmin != null) 
            hbaseAdmin.close();
        

        if (hbaseConnection != null) 
            hbaseConnection.close();
        

        super.close();
    

以上是关于Flink写入Hbase的主要内容,如果未能解决你的问题,请参考以下文章

HMaster组件异常导致Flink写HBase任务频繁重启问题解决

是否可以在流式 flink 作业中创建批处理 flink 作业?

20221222蔚来面试

20221222蔚来面试

20221222蔚来面试

flink与hbase交互